mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-11-26 03:45:06 +08:00
feat(drivers): add halalcloud_open driver (#1430)
* 新增清真云Open驱动,支持最新的轻量SDK * Change Go version in go.mod Downgrade Go version from 1.24.2 to 1.23.4 Signed-off-by: zzzhr1990 <zzzhr@hotmail.com> * Apply suggestions from code review * Removed unnecessary comments * Downgraded the Go version to 1.23.4. * Not sure whether FileStream supports concurrent read and write operations, so currently using single-threaded upload to ensure safety. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: zzzhr1990 <zzzhr@hotmail.com> * feat(halalcloud_open): support disk usage * Set useSingleUpload to true for upload safety Not sure whether FileStream supports concurrent read and write operations, so currently using single-threaded upload to ensure safety. Signed-off-by: zzzhr1990 <zzzhr@hotmail.com> * Update meta.go Change required for RefreshToken, If using a personal API approach, the RefreshToken is not required. Signed-off-by: zzzhr1990 <zzzhr@hotmail.com> * remove debug logs * bump halalcloud SDK version * fix unnecessary params * Update drivers/halalcloud_open/driver_init.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: zzzhr1990 <zzzhr@hotmail.com> * Fixed spelling errors; changed hardcoded retry parameters to constants. * remove pointer in get link function in utils.go --------- Signed-off-by: zzzhr1990 <zzzhr@hotmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: KirCute <951206789@qq.com>
This commit is contained in:
@@ -36,6 +36,7 @@ import (
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/google_drive"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/google_photo"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/halalcloud"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/halalcloud_open"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/ilanzou"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/ipfs_api"
|
||||
_ "github.com/OpenListTeam/OpenList/v4/drivers/kodbox"
|
||||
|
||||
111
drivers/halalcloud_open/common.go
Normal file
111
drivers/halalcloud_open/common.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
sdkUser "github.com/halalcloud/golang-sdk-lite/halalcloud/services/user"
|
||||
)
|
||||
|
||||
var (
|
||||
slicePostErrorRetryInterval = time.Second * 120
|
||||
retryTimes = 5
|
||||
)
|
||||
|
||||
type halalCommon struct {
|
||||
// *AuthService // 登录信息
|
||||
UserInfo *sdkUser.User // 用户信息
|
||||
refreshTokenFunc func(token string) error
|
||||
// serv *AuthService
|
||||
configs sync.Map
|
||||
}
|
||||
|
||||
func (m *halalCommon) GetAccessToken() (string, error) {
|
||||
value, exists := m.configs.Load("access_token")
|
||||
if !exists {
|
||||
return "", nil // 如果不存在,返回空字符串
|
||||
}
|
||||
return value.(string), nil // 返回配置项的值
|
||||
}
|
||||
|
||||
// GetRefreshToken implements ConfigStore.
|
||||
func (m *halalCommon) GetRefreshToken() (string, error) {
|
||||
value, exists := m.configs.Load("refresh_token")
|
||||
if !exists {
|
||||
return "", nil // 如果不存在,返回空字符串
|
||||
}
|
||||
return value.(string), nil // 返回配置项的值
|
||||
}
|
||||
|
||||
// SetAccessToken implements ConfigStore.
|
||||
func (m *halalCommon) SetAccessToken(token string) error {
|
||||
m.configs.Store("access_token", token)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetRefreshToken implements ConfigStore.
|
||||
func (m *halalCommon) SetRefreshToken(token string) error {
|
||||
m.configs.Store("refresh_token", token)
|
||||
if m.refreshTokenFunc != nil {
|
||||
return m.refreshTokenFunc(token)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetToken implements ConfigStore.
|
||||
func (m *halalCommon) SetToken(accessToken string, refreshToken string, expiresIn int64) error {
|
||||
m.configs.Store("access_token", accessToken)
|
||||
m.configs.Store("refresh_token", refreshToken)
|
||||
m.configs.Store("expires_in", expiresIn)
|
||||
if m.refreshTokenFunc != nil {
|
||||
return m.refreshTokenFunc(refreshToken)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearConfigs implements ConfigStore.
|
||||
func (m *halalCommon) ClearConfigs() error {
|
||||
m.configs = sync.Map{} // 清空map
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteConfig implements ConfigStore.
|
||||
func (m *halalCommon) DeleteConfig(key string) error {
|
||||
_, exists := m.configs.Load(key)
|
||||
if !exists {
|
||||
return nil // 如果不存在,直接返回
|
||||
}
|
||||
m.configs.Delete(key) // 删除指定的配置项
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConfig implements ConfigStore.
|
||||
func (m *halalCommon) GetConfig(key string) (string, error) {
|
||||
value, exists := m.configs.Load(key)
|
||||
if !exists {
|
||||
return "", nil // 如果不存在,返回空字符串
|
||||
}
|
||||
return value.(string), nil // 返回配置项的值
|
||||
}
|
||||
|
||||
// ListConfigs implements ConfigStore.
|
||||
func (m *halalCommon) ListConfigs() (map[string]string, error) {
|
||||
configs := make(map[string]string)
|
||||
m.configs.Range(func(key, value interface{}) bool {
|
||||
configs[key.(string)] = value.(string) // 将每个配置项添加到map中
|
||||
return true // 继续遍历
|
||||
})
|
||||
return configs, nil // 返回所有配置项
|
||||
}
|
||||
|
||||
// SetConfig implements ConfigStore.
|
||||
func (m *halalCommon) SetConfig(key string, value string) error {
|
||||
m.configs.Store(key, value) // 使用Store方法设置或更新配置项
|
||||
return nil // 成功设置配置项后返回nil
|
||||
}
|
||||
|
||||
func NewHalalCommon() *halalCommon {
|
||||
return &halalCommon{
|
||||
configs: sync.Map{},
|
||||
}
|
||||
}
|
||||
29
drivers/halalcloud_open/driver.go
Normal file
29
drivers/halalcloud_open/driver.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
sdkClient "github.com/halalcloud/golang-sdk-lite/halalcloud/apiclient"
|
||||
sdkUser "github.com/halalcloud/golang-sdk-lite/halalcloud/services/user"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
)
|
||||
|
||||
type HalalCloudOpen struct {
|
||||
*halalCommon
|
||||
model.Storage
|
||||
Addition
|
||||
sdkClient *sdkClient.Client
|
||||
sdkUserFileService *sdkUserFile.UserFileService
|
||||
sdkUserService *sdkUser.UserService
|
||||
uploadThread int
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Config() driver.Config {
|
||||
return config
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) GetAddition() driver.Additional {
|
||||
return &d.Addition
|
||||
}
|
||||
|
||||
var _ driver.Driver = (*HalalCloudOpen)(nil)
|
||||
131
drivers/halalcloud_open/driver_curd_impl.go
Normal file
131
drivers/halalcloud_open/driver_curd_impl.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
sdkModel "github.com/halalcloud/golang-sdk-lite/halalcloud/model"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
)
|
||||
|
||||
func (d *HalalCloudOpen) getFiles(ctx context.Context, dir model.Obj) ([]model.Obj, error) {
|
||||
|
||||
files := make([]model.Obj, 0)
|
||||
limit := int64(100)
|
||||
token := ""
|
||||
|
||||
for {
|
||||
result, err := d.sdkUserFileService.List(ctx, &sdkUserFile.FileListRequest{
|
||||
Parent: &sdkUserFile.File{Path: dir.GetPath()},
|
||||
ListInfo: &sdkModel.ScanListRequest{
|
||||
Limit: strconv.FormatInt(limit, 10),
|
||||
Token: token,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := 0; len(result.Files) > i; i++ {
|
||||
files = append(files, NewObjFile(result.Files[i]))
|
||||
}
|
||||
|
||||
if result.ListInfo == nil || result.ListInfo.Token == "" {
|
||||
break
|
||||
}
|
||||
token = result.ListInfo.Token
|
||||
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) makeDir(ctx context.Context, dir model.Obj, name string) (model.Obj, error) {
|
||||
_, err := d.sdkUserFileService.Create(ctx, &sdkUserFile.File{
|
||||
Path: dir.GetPath(),
|
||||
Name: name,
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) move(ctx context.Context, obj model.Obj, dir model.Obj) (model.Obj, error) {
|
||||
oldDir := obj.GetPath()
|
||||
newDir := dir.GetPath()
|
||||
_, err := d.sdkUserFileService.Move(ctx, &sdkUserFile.BatchOperationRequest{
|
||||
Source: []*sdkUserFile.File{
|
||||
{
|
||||
Path: oldDir,
|
||||
},
|
||||
},
|
||||
Dest: &sdkUserFile.File{
|
||||
Path: newDir,
|
||||
},
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) rename(ctx context.Context, obj model.Obj, name string) (model.Obj, error) {
|
||||
|
||||
_, err := d.sdkUserFileService.Rename(ctx, &sdkUserFile.File{
|
||||
Path: obj.GetPath(),
|
||||
Name: name,
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) copy(ctx context.Context, obj model.Obj, dir model.Obj) (model.Obj, error) {
|
||||
id := obj.GetID()
|
||||
sourcePath := obj.GetPath()
|
||||
if len(id) > 0 {
|
||||
sourcePath = ""
|
||||
}
|
||||
|
||||
destID := dir.GetID()
|
||||
destPath := dir.GetPath()
|
||||
if len(destID) > 0 {
|
||||
destPath = ""
|
||||
}
|
||||
dest := &sdkUserFile.File{
|
||||
Path: destPath,
|
||||
Identity: destID,
|
||||
}
|
||||
_, err := d.sdkUserFileService.Copy(ctx, &sdkUserFile.BatchOperationRequest{
|
||||
Source: []*sdkUserFile.File{
|
||||
{
|
||||
Path: sourcePath,
|
||||
Identity: id,
|
||||
},
|
||||
},
|
||||
Dest: dest,
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) remove(ctx context.Context, obj model.Obj) error {
|
||||
id := obj.GetID()
|
||||
_, err := d.sdkUserFileService.Delete(ctx, &sdkUserFile.BatchOperationRequest{
|
||||
Source: []*sdkUserFile.File{
|
||||
{
|
||||
Identity: id,
|
||||
Path: obj.GetPath(),
|
||||
},
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) details(ctx context.Context) (*model.StorageDetails, error) {
|
||||
ret, err := d.sdkUserService.GetStatisticsAndQuota(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
total := uint64(ret.DiskStatisticsQuota.BytesQuota)
|
||||
|
||||
free := uint64(ret.DiskStatisticsQuota.BytesFree)
|
||||
return &model.StorageDetails{
|
||||
DiskUsage: model.DiskUsage{
|
||||
TotalSpace: total,
|
||||
FreeSpace: free,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
108
drivers/halalcloud_open/driver_get_link.go
Normal file
108
drivers/halalcloud_open/driver_get_link.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
)
|
||||
|
||||
func (d *HalalCloudOpen) getLink(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
if args.Redirect {
|
||||
// return nil, model.ErrUnsupported
|
||||
fid := file.GetID()
|
||||
fpath := file.GetPath()
|
||||
if fid != "" {
|
||||
fpath = ""
|
||||
}
|
||||
fi, err := d.sdkUserFileService.GetDirectDownloadAddress(ctx, &sdkUserFile.DirectDownloadRequest{
|
||||
Identity: fid,
|
||||
Path: fpath,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expireAt := fi.ExpireAt
|
||||
duration := time.Until(time.UnixMilli(expireAt))
|
||||
return &model.Link{
|
||||
URL: fi.DownloadAddress,
|
||||
Expiration: &duration,
|
||||
}, nil
|
||||
}
|
||||
result, err := d.sdkUserFileService.ParseFileSlice(ctx, &sdkUserFile.File{
|
||||
Identity: file.GetID(),
|
||||
Path: file.GetPath(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileAddrs := []*sdkUserFile.SliceDownloadInfo{}
|
||||
var addressDuration int64
|
||||
|
||||
nodesNumber := len(result.RawNodes)
|
||||
nodesIndex := nodesNumber - 1
|
||||
startIndex, endIndex := 0, nodesIndex
|
||||
for nodesIndex >= 0 {
|
||||
if nodesIndex >= 200 {
|
||||
endIndex = 200
|
||||
} else {
|
||||
endIndex = nodesNumber
|
||||
}
|
||||
for ; endIndex <= nodesNumber; endIndex += 200 {
|
||||
if endIndex == 0 {
|
||||
endIndex = 1
|
||||
}
|
||||
sliceAddress, err := d.sdkUserFileService.GetSliceDownloadAddress(ctx, &sdkUserFile.SliceDownloadAddressRequest{
|
||||
Identity: result.RawNodes[startIndex:endIndex],
|
||||
Version: 1,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addressDuration, _ = strconv.ParseInt(sliceAddress.ExpireAt, 10, 64)
|
||||
fileAddrs = append(fileAddrs, sliceAddress.Addresses...)
|
||||
startIndex = endIndex
|
||||
nodesIndex -= 200
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
size, _ := strconv.ParseInt(result.FileSize, 10, 64)
|
||||
chunks := getChunkSizes(result.Sizes)
|
||||
resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
|
||||
length := httpRange.Length
|
||||
if httpRange.Length < 0 || httpRange.Start+httpRange.Length >= size {
|
||||
length = size - httpRange.Start
|
||||
}
|
||||
oo := &openObject{
|
||||
ctx: ctx,
|
||||
d: fileAddrs,
|
||||
chunk: []byte{},
|
||||
chunks: chunks,
|
||||
skip: httpRange.Start,
|
||||
sha: result.Sha1,
|
||||
shaTemp: sha1.New(),
|
||||
}
|
||||
|
||||
return readers.NewLimitedReadCloser(oo, length), nil
|
||||
}
|
||||
|
||||
var duration time.Duration
|
||||
if addressDuration != 0 {
|
||||
duration = time.Until(time.UnixMilli(addressDuration))
|
||||
} else {
|
||||
duration = time.Until(time.Now().Add(time.Hour))
|
||||
}
|
||||
|
||||
return &model.Link{
|
||||
RangeReader: stream.RateLimitRangeReaderFunc(resultRangeReader),
|
||||
Expiration: &duration,
|
||||
}, nil
|
||||
}
|
||||
50
drivers/halalcloud_open/driver_init.go
Normal file
50
drivers/halalcloud_open/driver_init.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/halalcloud/golang-sdk-lite/halalcloud/apiclient"
|
||||
sdkUser "github.com/halalcloud/golang-sdk-lite/halalcloud/services/user"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
)
|
||||
|
||||
func (d *HalalCloudOpen) Init(ctx context.Context) error {
|
||||
if d.uploadThread < 1 || d.uploadThread > 32 {
|
||||
d.uploadThread, d.UploadThread = 3, 3
|
||||
}
|
||||
if d.halalCommon == nil {
|
||||
d.halalCommon = &halalCommon{
|
||||
UserInfo: &sdkUser.User{},
|
||||
refreshTokenFunc: func(token string) error {
|
||||
d.Addition.RefreshToken = token
|
||||
op.MustSaveDriverStorage(d)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
if d.Addition.RefreshToken != "" {
|
||||
d.halalCommon.SetRefreshToken(d.Addition.RefreshToken)
|
||||
}
|
||||
timeout := d.Addition.TimeOut
|
||||
if timeout <= 0 {
|
||||
timeout = 60
|
||||
}
|
||||
host := d.Addition.Host
|
||||
if host == "" {
|
||||
host = "openapi.2dland.cn"
|
||||
}
|
||||
|
||||
client := apiclient.NewClient(nil, host, d.Addition.ClientID, d.Addition.ClientSecret, d.halalCommon, apiclient.WithTimeout(time.Second*time.Duration(timeout)))
|
||||
d.sdkClient = client
|
||||
d.sdkUserFileService = sdkUserFile.NewUserFileService(client)
|
||||
d.sdkUserService = sdkUser.NewUserService(client)
|
||||
userInfo, err := d.sdkUserService.Get(ctx, &sdkUser.User{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.halalCommon.UserInfo = userInfo
|
||||
// 能够获取到用户信息,已经检查了 RefreshToken 的有效性,无需再次检查
|
||||
return nil
|
||||
}
|
||||
48
drivers/halalcloud_open/driver_interface.go
Normal file
48
drivers/halalcloud_open/driver_interface.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
)
|
||||
|
||||
func (d *HalalCloudOpen) Drop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
|
||||
return d.getFiles(ctx, dir)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
return d.getLink(ctx, file, args)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) {
|
||||
return d.makeDir(ctx, parentDir, dirName)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
|
||||
return d.move(ctx, srcObj, dstDir)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) {
|
||||
return d.rename(ctx, srcObj, newName)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
|
||||
return d.copy(ctx, srcObj, dstDir)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Remove(ctx context.Context, obj model.Obj) error {
|
||||
return d.remove(ctx, obj)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
|
||||
return d.put(ctx, dstDir, stream, up)
|
||||
}
|
||||
|
||||
func (d *HalalCloudOpen) GetDetails(ctx context.Context) (*model.StorageDetails, error) {
|
||||
return d.details(ctx)
|
||||
}
|
||||
258
drivers/halalcloud_open/halalcloud_upload.go
Normal file
258
drivers/halalcloud_open/halalcloud_upload.go
Normal file
@@ -0,0 +1,258 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
func (d *HalalCloudOpen) put(ctx context.Context, dstDir model.Obj, fileStream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
|
||||
|
||||
newPath := path.Join(dstDir.GetPath(), fileStream.GetName())
|
||||
|
||||
uploadTask, err := d.sdkUserFileService.CreateUploadTask(ctx, &sdkUserFile.File{
|
||||
Path: newPath,
|
||||
Size: fileStream.GetSize(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if uploadTask.Created {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
slicesList := make([]string, 0)
|
||||
codec := uint64(0x55)
|
||||
if uploadTask.BlockCodec > 0 {
|
||||
codec = uint64(uploadTask.BlockCodec)
|
||||
}
|
||||
blockHashType := uploadTask.BlockHashType
|
||||
mhType := uint64(0x12)
|
||||
if blockHashType > 0 {
|
||||
mhType = uint64(blockHashType)
|
||||
}
|
||||
prefix := cid.Prefix{
|
||||
Codec: codec,
|
||||
MhLength: -1,
|
||||
MhType: mhType,
|
||||
Version: 1,
|
||||
}
|
||||
blockSize := uploadTask.BlockSize
|
||||
useSingleUpload := true
|
||||
//
|
||||
if fileStream.GetSize() <= int64(blockSize) || d.uploadThread <= 1 {
|
||||
useSingleUpload = true
|
||||
}
|
||||
// Not sure whether FileStream supports concurrent read and write operations, so currently using single-threaded upload to ensure safety.
|
||||
// read file
|
||||
if useSingleUpload {
|
||||
bufferSize := int(blockSize)
|
||||
buffer := make([]byte, bufferSize)
|
||||
reader := driver.NewLimitedUploadStream(ctx, fileStream)
|
||||
teeReader := io.TeeReader(reader, driver.NewProgress(fileStream.GetSize(), up))
|
||||
// fileStream.Seek(0, os.SEEK_SET)
|
||||
for {
|
||||
n, err := teeReader.Read(buffer)
|
||||
if n > 0 {
|
||||
data := buffer[:n]
|
||||
uploadCid, err := postFileSlice(ctx, data, uploadTask.Task, uploadTask.UploadAddress, prefix, retryTimes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
slicesList = append(slicesList, uploadCid.String())
|
||||
}
|
||||
if err == io.EOF || n == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: implement multipart upload, currently using single-threaded upload to ensure safety.
|
||||
bufferSize := int(blockSize)
|
||||
buffer := make([]byte, bufferSize)
|
||||
reader := driver.NewLimitedUploadStream(ctx, fileStream)
|
||||
teeReader := io.TeeReader(reader, driver.NewProgress(fileStream.GetSize(), up))
|
||||
for {
|
||||
n, err := teeReader.Read(buffer)
|
||||
if n > 0 {
|
||||
data := buffer[:n]
|
||||
uploadCid, err := postFileSlice(ctx, data, uploadTask.Task, uploadTask.UploadAddress, prefix, retryTimes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
slicesList = append(slicesList, uploadCid.String())
|
||||
}
|
||||
if err == io.EOF || n == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
newFile, err := makeFile(ctx, slicesList, uploadTask.Task, uploadTask.UploadAddress, retryTimes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewObjFile(newFile), nil
|
||||
|
||||
}
|
||||
|
||||
func makeFile(ctx context.Context, fileSlice []string, taskID string, uploadAddress string, retry int) (*sdkUserFile.File, error) {
|
||||
var lastError error = nil
|
||||
for range retry {
|
||||
newFile, err := doMakeFile(fileSlice, taskID, uploadAddress)
|
||||
if err == nil {
|
||||
return newFile, nil
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, err
|
||||
}
|
||||
lastError = err
|
||||
time.Sleep(slicePostErrorRetryInterval)
|
||||
}
|
||||
return nil, fmt.Errorf("mk file slice failed after %d times, error: %s", retry, lastError.Error())
|
||||
}
|
||||
|
||||
func doMakeFile(fileSlice []string, taskID string, uploadAddress string) (*sdkUserFile.File, error) {
|
||||
accessUrl := uploadAddress + "/" + taskID
|
||||
getTimeOut := time.Minute * 2
|
||||
u, err := url.Parse(accessUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
n, _ := json.Marshal(fileSlice)
|
||||
httpRequest := http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: u,
|
||||
Header: map[string][]string{
|
||||
"Accept": {"application/json"},
|
||||
"Content-Type": {"application/json"},
|
||||
//"Content-Length": {fmt.Sprintf("%d", len(n))},
|
||||
},
|
||||
Body: io.NopCloser(bytes.NewReader(n)),
|
||||
}
|
||||
httpClient := http.Client{
|
||||
Timeout: getTimeOut,
|
||||
}
|
||||
httpResponse, err := httpClient.Do(&httpRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer httpResponse.Body.Close()
|
||||
if httpResponse.StatusCode != http.StatusOK && httpResponse.StatusCode != http.StatusCreated {
|
||||
b, _ := io.ReadAll(httpResponse.Body)
|
||||
message := string(b)
|
||||
return nil, fmt.Errorf("mk file slice failed, status code: %d, message: %s", httpResponse.StatusCode, message)
|
||||
}
|
||||
b, _ := io.ReadAll(httpResponse.Body)
|
||||
var result *sdkUserFile.File
|
||||
err = json.Unmarshal(b, &result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
func postFileSlice(ctx context.Context, fileSlice []byte, taskID string, uploadAddress string, preix cid.Prefix, retry int) (cid.Cid, error) {
|
||||
var lastError error = nil
|
||||
for range retry {
|
||||
newCid, err := doPostFileSlice(fileSlice, taskID, uploadAddress, preix)
|
||||
if err == nil {
|
||||
return newCid, nil
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
time.Sleep(slicePostErrorRetryInterval)
|
||||
lastError = err
|
||||
}
|
||||
return cid.Undef, fmt.Errorf("upload file slice failed after %d times, error: %s", retry, lastError.Error())
|
||||
}
|
||||
func doPostFileSlice(fileSlice []byte, taskID string, uploadAddress string, preix cid.Prefix) (cid.Cid, error) {
|
||||
// 1. sum file slice
|
||||
newCid, err := preix.Sum(fileSlice)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
// 2. post file slice
|
||||
sliceCidString := newCid.String()
|
||||
// /{taskID}/{sliceID}
|
||||
accessUrl := uploadAddress + "/" + taskID + "/" + sliceCidString
|
||||
getTimeOut := time.Second * 30
|
||||
// get {accessUrl} in {getTimeOut}
|
||||
u, err := url.Parse(accessUrl)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
// header: accept: application/json
|
||||
// header: content-type: application/octet-stream
|
||||
// header: content-length: {fileSlice.length}
|
||||
// header: x-content-cid: {sliceCidString}
|
||||
// header: x-task-id: {taskID}
|
||||
httpRequest := http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: u,
|
||||
Header: map[string][]string{
|
||||
"Accept": {"application/json"},
|
||||
},
|
||||
}
|
||||
httpClient := http.Client{
|
||||
Timeout: getTimeOut,
|
||||
}
|
||||
httpResponse, err := httpClient.Do(&httpRequest)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
if httpResponse.StatusCode != http.StatusOK {
|
||||
return cid.Undef, fmt.Errorf("upload file slice failed, status code: %d", httpResponse.StatusCode)
|
||||
}
|
||||
var result bool
|
||||
b, err := io.ReadAll(httpResponse.Body)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
err = json.Unmarshal(b, &result)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
if result {
|
||||
return newCid, nil
|
||||
}
|
||||
|
||||
httpRequest = http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: u,
|
||||
Header: map[string][]string{
|
||||
"Accept": {"application/json"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
// "Content-Length": {fmt.Sprintf("%d", len(fileSlice))},
|
||||
},
|
||||
Body: io.NopCloser(bytes.NewReader(fileSlice)),
|
||||
}
|
||||
httpResponse, err = httpClient.Do(&httpRequest)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
defer httpResponse.Body.Close()
|
||||
if httpResponse.StatusCode != http.StatusOK && httpResponse.StatusCode != http.StatusCreated {
|
||||
b, _ := io.ReadAll(httpResponse.Body)
|
||||
message := string(b)
|
||||
return cid.Undef, fmt.Errorf("upload file slice failed, status code: %d, message: %s", httpResponse.StatusCode, message)
|
||||
}
|
||||
//
|
||||
|
||||
return newCid, nil
|
||||
}
|
||||
32
drivers/halalcloud_open/meta.go
Normal file
32
drivers/halalcloud_open/meta.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
)
|
||||
|
||||
type Addition struct {
|
||||
// Usually one of two
|
||||
driver.RootPath
|
||||
// define other
|
||||
RefreshToken string `json:"refresh_token" required:"false" help:"If using a personal API approach, the RefreshToken is not required."`
|
||||
UploadThread int `json:"upload_thread" type:"number" default:"3" help:"1 <= thread <= 32"`
|
||||
|
||||
ClientID string `json:"client_id" required:"true" default:""`
|
||||
ClientSecret string `json:"client_secret" required:"true" default:""`
|
||||
Host string `json:"host" required:"false" default:"openapi.2dland.cn"`
|
||||
TimeOut int `json:"timeout" type:"number" default:"60" help:"timeout in seconds"`
|
||||
}
|
||||
|
||||
var config = driver.Config{
|
||||
Name: "HalalCloudOpen",
|
||||
OnlyProxy: false,
|
||||
DefaultRoot: "/",
|
||||
NoLinkURL: false,
|
||||
}
|
||||
|
||||
func init() {
|
||||
op.RegisterDriver(func() driver.Driver {
|
||||
return &HalalCloudOpen{}
|
||||
})
|
||||
}
|
||||
60
drivers/halalcloud_open/obj_file.go
Normal file
60
drivers/halalcloud_open/obj_file.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
)
|
||||
|
||||
type ObjFile struct {
|
||||
sdkFile *sdkUserFile.File
|
||||
fileSize int64
|
||||
modTime time.Time
|
||||
createTime time.Time
|
||||
}
|
||||
|
||||
func NewObjFile(f *sdkUserFile.File) model.Obj {
|
||||
ofile := &ObjFile{sdkFile: f}
|
||||
ofile.fileSize = f.Size
|
||||
modTimeTs := f.UpdateTs
|
||||
ofile.modTime = time.UnixMilli(modTimeTs)
|
||||
createTimeTs := f.CreateTs
|
||||
ofile.createTime = time.UnixMilli(createTimeTs)
|
||||
return ofile
|
||||
}
|
||||
|
||||
func (f *ObjFile) GetSize() int64 {
|
||||
return f.fileSize
|
||||
}
|
||||
|
||||
func (f *ObjFile) GetName() string {
|
||||
return f.sdkFile.Name
|
||||
}
|
||||
|
||||
func (f *ObjFile) ModTime() time.Time {
|
||||
return f.modTime
|
||||
}
|
||||
|
||||
func (f *ObjFile) IsDir() bool {
|
||||
return f.sdkFile.Dir
|
||||
}
|
||||
|
||||
func (f *ObjFile) GetHash() utils.HashInfo {
|
||||
return utils.HashInfo{
|
||||
// TODO: support more hash types
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ObjFile) GetID() string {
|
||||
return f.sdkFile.Identity
|
||||
}
|
||||
|
||||
func (f *ObjFile) GetPath() string {
|
||||
return f.sdkFile.Path
|
||||
}
|
||||
|
||||
func (f *ObjFile) CreateTime() time.Time {
|
||||
return f.createTime
|
||||
}
|
||||
185
drivers/halalcloud_open/utils.go
Normal file
185
drivers/halalcloud_open/utils.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package halalcloudopen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// get the next chunk
|
||||
func (oo *openObject) getChunk(_ context.Context) (err error) {
|
||||
if oo.id >= len(oo.chunks) {
|
||||
return io.EOF
|
||||
}
|
||||
var chunk []byte
|
||||
err = utils.Retry(3, time.Second, func() (err error) {
|
||||
chunk, err = getRawFiles(oo.d[oo.id])
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oo.id++
|
||||
oo.chunk = chunk
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read reads up to len(p) bytes into p.
|
||||
func (oo *openObject) Read(p []byte) (n int, err error) {
|
||||
oo.mu.Lock()
|
||||
defer oo.mu.Unlock()
|
||||
if oo.closed {
|
||||
return 0, fmt.Errorf("read on closed file")
|
||||
}
|
||||
// Skip data at the start if requested
|
||||
for oo.skip > 0 {
|
||||
//size := 1024 * 1024
|
||||
_, size, err := oo.ChunkLocation(oo.id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if oo.skip < int64(size) {
|
||||
break
|
||||
}
|
||||
oo.id++
|
||||
oo.skip -= int64(size)
|
||||
}
|
||||
if len(oo.chunk) == 0 {
|
||||
err = oo.getChunk(oo.ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if oo.skip > 0 {
|
||||
oo.chunk = (oo.chunk)[oo.skip:]
|
||||
oo.skip = 0
|
||||
}
|
||||
}
|
||||
n = copy(p, oo.chunk)
|
||||
oo.shaTemp.Write(p[:n])
|
||||
oo.chunk = (oo.chunk)[n:]
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Close closed the file - MAC errors are reported here
|
||||
func (oo *openObject) Close() (err error) {
|
||||
oo.mu.Lock()
|
||||
defer oo.mu.Unlock()
|
||||
if oo.closed {
|
||||
return nil
|
||||
}
|
||||
// 校验Sha1
|
||||
if string(oo.shaTemp.Sum(nil)) != oo.sha {
|
||||
return fmt.Errorf("failed to finish download: SHA mismatch")
|
||||
}
|
||||
|
||||
oo.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetMD5Hash(text string) string {
|
||||
tHash := md5.Sum([]byte(text))
|
||||
return hex.EncodeToString(tHash[:])
|
||||
}
|
||||
|
||||
type chunkSize struct {
|
||||
position int64
|
||||
size int
|
||||
}
|
||||
|
||||
type openObject struct {
|
||||
ctx context.Context
|
||||
mu sync.Mutex
|
||||
d []*sdkUserFile.SliceDownloadInfo
|
||||
id int
|
||||
skip int64
|
||||
chunk []byte
|
||||
chunks []chunkSize
|
||||
closed bool
|
||||
sha string
|
||||
shaTemp hash.Hash
|
||||
}
|
||||
|
||||
func getChunkSizes(sliceSize []*sdkUserFile.SliceSize) (chunks []chunkSize) {
|
||||
chunks = make([]chunkSize, 0)
|
||||
for _, s := range sliceSize {
|
||||
// 对最后一个做特殊处理
|
||||
endIndex := s.EndIndex
|
||||
startIndex := s.StartIndex
|
||||
if endIndex == 0 {
|
||||
endIndex = startIndex
|
||||
}
|
||||
for j := startIndex; j <= endIndex; j++ {
|
||||
size := s.Size
|
||||
chunks = append(chunks, chunkSize{position: j, size: int(size)})
|
||||
}
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
func (oo *openObject) ChunkLocation(id int) (position int64, size int, err error) {
|
||||
if id < 0 || id >= len(oo.chunks) {
|
||||
return 0, 0, errors.New("invalid arguments")
|
||||
}
|
||||
|
||||
return (oo.chunks)[id].position, (oo.chunks)[id].size, nil
|
||||
}
|
||||
|
||||
func getRawFiles(addr *sdkUserFile.SliceDownloadInfo) ([]byte, error) {
|
||||
|
||||
if addr == nil {
|
||||
return nil, errors.New("addr is nil")
|
||||
}
|
||||
|
||||
client := http.Client{
|
||||
Timeout: time.Duration(60 * time.Second), // Set timeout to 60 seconds
|
||||
}
|
||||
resp, err := client.Get(addr.DownloadAddress)
|
||||
if err != nil {
|
||||
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("bad status: %s, body: %s", resp.Status, body)
|
||||
}
|
||||
|
||||
if addr.Encrypt > 0 {
|
||||
cd := uint8(addr.Encrypt)
|
||||
for idx := 0; idx < len(body); idx++ {
|
||||
body[idx] = body[idx] ^ cd
|
||||
}
|
||||
}
|
||||
storeType := addr.StoreType
|
||||
if storeType != 10 {
|
||||
|
||||
sourceCid, err := cid.Decode(addr.Identity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
checkCid, err := sourceCid.Prefix().Sum(body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !checkCid.Equals(sourceCid) {
|
||||
return nil, fmt.Errorf("bad cid: %s, body: %s", checkCid.String(), body)
|
||||
}
|
||||
}
|
||||
|
||||
return body, nil
|
||||
|
||||
}
|
||||
1
go.mod
1
go.mod
@@ -38,6 +38,7 @@ require (
|
||||
github.com/golang-jwt/jwt/v4 v4.5.2
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499
|
||||
github.com/hekmon/transmissionrpc/v3 v3.0.0
|
||||
github.com/ipfs/go-ipfs-api v0.7.0
|
||||
github.com/itsHenry35/gofakes3 v0.0.8
|
||||
|
||||
4
go.sum
4
go.sum
@@ -364,6 +364,10 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg
|
||||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/halalcloud/golang-sdk-lite v0.0.0-20251006100629-ba7a40dce261 h1:47L8SHM80cXszQydLrpp9MhVkFLLWCvrU9XmJ6XtRu0=
|
||||
github.com/halalcloud/golang-sdk-lite v0.0.0-20251006100629-ba7a40dce261/go.mod h1:8x1h4rm3s8xMcTyJrq848sQ6BJnKzl57mDY4CNshdPM=
|
||||
github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499 h1:4ovnBdiGDFi8putQGxhipuuhXItAgh4/YnzufPYkZkQ=
|
||||
github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499/go.mod h1:8x1h4rm3s8xMcTyJrq848sQ6BJnKzl57mDY4CNshdPM=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
|
||||
Reference in New Issue
Block a user