mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-11-25 19:37:41 +08:00
feat(123_open): update upload api v2 (#976)
This commit is contained in:
@@ -2,7 +2,9 @@ package _123_open
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||||
@@ -95,6 +97,22 @@ func (d *Open123) Rename(ctx context.Context, srcObj model.Obj, newName string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Open123) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
|
func (d *Open123) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
|
||||||
|
// 尝试使用上传+MD5秒传功能实现复制
|
||||||
|
// 1. 创建文件
|
||||||
|
// parentFileID 父目录id,上传到根目录时填写 0
|
||||||
|
parentFileId, err := strconv.ParseInt(dstDir.GetID(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse parentFileID error: %v", err)
|
||||||
|
}
|
||||||
|
etag := srcObj.(File).Etag
|
||||||
|
createResp, err := d.create(parentFileId, srcObj.GetName(), etag, srcObj.GetSize(), 2, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// 是否秒传
|
||||||
|
if createResp.Data.Reuse {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return errs.NotSupport
|
return errs.NotSupport
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -105,9 +123,14 @@ func (d *Open123) Remove(ctx context.Context, obj model.Obj) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
|
func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
|
||||||
|
// 1. 创建文件
|
||||||
|
// parentFileID 父目录id,上传到根目录时填写 0
|
||||||
parentFileId, err := strconv.ParseInt(dstDir.GetID(), 10, 64)
|
parentFileId, err := strconv.ParseInt(dstDir.GetID(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse parentFileID error: %v", err)
|
||||||
|
}
|
||||||
|
// etag 文件md5
|
||||||
etag := file.GetHash().GetHash(utils.MD5)
|
etag := file.GetHash().GetHash(utils.MD5)
|
||||||
|
|
||||||
if len(etag) < utils.MD5.Width {
|
if len(etag) < utils.MD5.Width {
|
||||||
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
|
||||||
up = model.UpdateProgressWithRange(up, 50, 100)
|
up = model.UpdateProgressWithRange(up, 50, 100)
|
||||||
@@ -120,11 +143,29 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// 是否秒传
|
||||||
if createResp.Data.Reuse {
|
if createResp.Data.Reuse {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.Upload(ctx, file, createResp, up)
|
// 2. 上传分片
|
||||||
|
err = d.Upload(ctx, file, createResp, up)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 上传完毕
|
||||||
|
for range 60 {
|
||||||
|
uploadCompleteResp, err := d.complete(createResp.Data.PreuploadID)
|
||||||
|
// 返回错误代码未知,如:20103,文档也没有具体说
|
||||||
|
if err == nil && uploadCompleteResp.Data.Completed && uploadCompleteResp.Data.FileID != 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// 若接口返回的completed为 false 时,则需间隔1秒继续轮询此接口,获取上传最终结果。
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
up(100)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ driver.Driver = (*Open123)(nil)
|
var _ driver.Driver = (*Open123)(nil)
|
||||||
|
|||||||
@@ -154,52 +154,23 @@ type DownloadInfoResp struct {
|
|||||||
} `json:"data"`
|
} `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 创建文件V2返回
|
||||||
type UploadCreateResp struct {
|
type UploadCreateResp struct {
|
||||||
BaseResp
|
BaseResp
|
||||||
Data struct {
|
Data struct {
|
||||||
FileID int64 `json:"fileID"`
|
FileID int64 `json:"fileID"`
|
||||||
PreuploadID string `json:"preuploadID"`
|
PreuploadID string `json:"preuploadID"`
|
||||||
Reuse bool `json:"reuse"`
|
Reuse bool `json:"reuse"`
|
||||||
SliceSize int64 `json:"sliceSize"`
|
SliceSize int64 `json:"sliceSize"`
|
||||||
|
Servers []string `json:"servers"`
|
||||||
} `json:"data"`
|
} `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UploadUrlResp struct {
|
// 上传完毕V2返回
|
||||||
BaseResp
|
|
||||||
Data struct {
|
|
||||||
PresignedURL string `json:"presignedURL"`
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type UploadCompleteResp struct {
|
type UploadCompleteResp struct {
|
||||||
BaseResp
|
BaseResp
|
||||||
Data struct {
|
Data struct {
|
||||||
Async bool `json:"async"`
|
|
||||||
Completed bool `json:"completed"`
|
Completed bool `json:"completed"`
|
||||||
FileID int64 `json:"fileID"`
|
FileID int64 `json:"fileID"`
|
||||||
} `json:"data"`
|
} `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UploadAsyncResp struct {
|
|
||||||
BaseResp
|
|
||||||
Data struct {
|
|
||||||
Completed bool `json:"completed"`
|
|
||||||
FileID int64 `json:"fileID"`
|
|
||||||
} `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type UploadResp struct {
|
|
||||||
BaseResp
|
|
||||||
Data struct {
|
|
||||||
AccessKeyId string `json:"AccessKeyId"`
|
|
||||||
Bucket string `json:"Bucket"`
|
|
||||||
Key string `json:"Key"`
|
|
||||||
SecretAccessKey string `json:"SecretAccessKey"`
|
|
||||||
SessionToken string `json:"SessionToken"`
|
|
||||||
FileId int64 `json:"FileId"`
|
|
||||||
Reuse bool `json:"Reuse"`
|
|
||||||
EndPoint string `json:"EndPoint"`
|
|
||||||
StorageNode string `json:"StorageNode"`
|
|
||||||
UploadId string `json:"UploadId"`
|
|
||||||
} `json:"data"`
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,9 +1,14 @@
|
|||||||
package _123_open
|
package _123_open
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -17,6 +22,7 @@ import (
|
|||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// 创建文件 V2
|
||||||
func (d *Open123) create(parentFileID int64, filename string, etag string, size int64, duplicate int, containDir bool) (*UploadCreateResp, error) {
|
func (d *Open123) create(parentFileID int64, filename string, etag string, size int64, duplicate int, containDir bool) (*UploadCreateResp, error) {
|
||||||
var resp UploadCreateResp
|
var resp UploadCreateResp
|
||||||
_, err := d.Request(UploadCreate, http.MethodPost, func(req *resty.Request) {
|
_, err := d.Request(UploadCreate, http.MethodPost, func(req *resty.Request) {
|
||||||
@@ -35,48 +41,9 @@ func (d *Open123) create(parentFileID int64, filename string, etag string, size
|
|||||||
return &resp, nil
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Open123) url(preuploadID string, sliceNo int64) (string, error) {
|
// 上传分片 V2
|
||||||
// get upload url
|
|
||||||
var resp UploadUrlResp
|
|
||||||
_, err := d.Request(UploadUrl, http.MethodPost, func(req *resty.Request) {
|
|
||||||
req.SetBody(base.Json{
|
|
||||||
"preuploadId": preuploadID,
|
|
||||||
"sliceNo": sliceNo,
|
|
||||||
})
|
|
||||||
}, &resp)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return resp.Data.PresignedURL, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Open123) complete(preuploadID string) (*UploadCompleteResp, error) {
|
|
||||||
var resp UploadCompleteResp
|
|
||||||
_, err := d.Request(UploadComplete, http.MethodPost, func(req *resty.Request) {
|
|
||||||
req.SetBody(base.Json{
|
|
||||||
"preuploadID": preuploadID,
|
|
||||||
})
|
|
||||||
}, &resp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Open123) async(preuploadID string) (*UploadAsyncResp, error) {
|
|
||||||
var resp UploadAsyncResp
|
|
||||||
_, err := d.Request(UploadAsync, http.MethodPost, func(req *resty.Request) {
|
|
||||||
req.SetBody(base.Json{
|
|
||||||
"preuploadID": preuploadID,
|
|
||||||
})
|
|
||||||
}, &resp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createResp *UploadCreateResp, up driver.UpdateProgress) error {
|
func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createResp *UploadCreateResp, up driver.UpdateProgress) error {
|
||||||
|
uploadDomain := createResp.Data.Servers[0]
|
||||||
size := file.GetSize()
|
size := file.GetSize()
|
||||||
chunkSize := createResp.Data.SliceSize
|
chunkSize := createResp.Data.SliceSize
|
||||||
uploadNums := (size + chunkSize - 1) / chunkSize
|
uploadNums := (size + chunkSize - 1) / chunkSize
|
||||||
@@ -90,7 +57,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for partIndex := int64(0); partIndex < uploadNums; partIndex++ {
|
for partIndex := range uploadNums {
|
||||||
if utils.IsCanceled(uploadCtx) {
|
if utils.IsCanceled(uploadCtx) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -100,36 +67,90 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
|||||||
size := min(chunkSize, size-offset)
|
size := min(chunkSize, size-offset)
|
||||||
var reader *stream.SectionReader
|
var reader *stream.SectionReader
|
||||||
var rateLimitedRd io.Reader
|
var rateLimitedRd io.Reader
|
||||||
|
sliceMD5 := ""
|
||||||
threadG.GoWithLifecycle(errgroup.Lifecycle{
|
threadG.GoWithLifecycle(errgroup.Lifecycle{
|
||||||
Before: func(ctx context.Context) error {
|
Before: func(ctx context.Context) error {
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
var err error
|
var err error
|
||||||
|
// 每个分片一个reader
|
||||||
reader, err = ss.GetSectionReader(offset, size)
|
reader, err = ss.GetSectionReader(offset, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// 计算当前分片的MD5
|
||||||
|
sliceMD5, err = utils.HashReader(utils.MD5, reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
|
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
Do: func(ctx context.Context) error {
|
Do: func(ctx context.Context) error {
|
||||||
|
// 重置分片reader位置,因为HashReader、上一次失败已经读取到分片EOF
|
||||||
reader.Seek(0, io.SeekStart)
|
reader.Seek(0, io.SeekStart)
|
||||||
uploadPartUrl, err := d.url(createResp.Data.PreuploadID, partNumber)
|
|
||||||
|
// 创建表单数据
|
||||||
|
var b bytes.Buffer
|
||||||
|
w := multipart.NewWriter(&b)
|
||||||
|
// 添加表单字段
|
||||||
|
err = w.WriteField("preuploadID", createResp.Data.PreuploadID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = w.WriteField("sliceNo", strconv.FormatInt(partNumber, 10))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = w.WriteField("sliceMD5", sliceMD5)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// 写入文件内容
|
||||||
|
fw, err := w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = utils.CopyWithBuffer(fw, rateLimitedRd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = w.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadPartUrl, rateLimitedRd)
|
// 创建请求并设置header
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", &b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.ContentLength = size
|
|
||||||
|
// 设置请求头
|
||||||
|
req.Header.Add("Authorization", "Bearer "+d.AccessToken)
|
||||||
|
req.Header.Add("Content-Type", w.FormDataContentType())
|
||||||
|
req.Header.Add("Platform", "open_platform")
|
||||||
|
|
||||||
res, err := base.HttpClient.Do(req)
|
res, err := base.HttpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_ = res.Body.Close()
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != 200 {
|
||||||
|
return fmt.Errorf("slice %d upload failed, status code: %d", partNumber, res.StatusCode)
|
||||||
|
}
|
||||||
|
var resp BaseResp
|
||||||
|
respBody, err := io.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(respBody, &resp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.Code != 0 {
|
||||||
|
return fmt.Errorf("slice %d upload failed: %s", partNumber, resp.Message)
|
||||||
|
}
|
||||||
|
|
||||||
progress := 10.0 + 85.0*float64(threadG.Success())/float64(uploadNums)
|
progress := 10.0 + 85.0*float64(threadG.Success())/float64(uploadNums)
|
||||||
up(progress)
|
up(progress)
|
||||||
@@ -145,23 +166,19 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadCompleteResp, err := d.complete(createResp.Data.PreuploadID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if uploadCompleteResp.Data.Async == false || uploadCompleteResp.Data.Completed {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
uploadAsyncResp, err := d.async(createResp.Data.PreuploadID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if uploadAsyncResp.Data.Completed {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
up(100)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 上传完毕
|
||||||
|
func (d *Open123) complete(preuploadID string) (*UploadCompleteResp, error) {
|
||||||
|
var resp UploadCompleteResp
|
||||||
|
_, err := d.Request(UploadComplete, http.MethodPost, func(req *resty.Request) {
|
||||||
|
req.SetBody(base.Json{
|
||||||
|
"preuploadID": preuploadID,
|
||||||
|
})
|
||||||
|
}, &resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &resp, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,16 +19,14 @@ var ( //不同情况下获取的AccessTokenQPS限制不同 如下模块化易于
|
|||||||
AccessToken = InitApiInfo(Api+"/api/v1/access_token", 1)
|
AccessToken = InitApiInfo(Api+"/api/v1/access_token", 1)
|
||||||
RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1)
|
RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1)
|
||||||
UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1)
|
UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1)
|
||||||
FileList = InitApiInfo(Api+"/api/v2/file/list", 4)
|
FileList = InitApiInfo(Api+"/api/v2/file/list", 3)
|
||||||
DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 0)
|
DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 0)
|
||||||
Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2)
|
Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2)
|
||||||
Move = InitApiInfo(Api+"/api/v1/file/move", 1)
|
Move = InitApiInfo(Api+"/api/v1/file/move", 1)
|
||||||
Rename = InitApiInfo(Api+"/api/v1/file/name", 1)
|
Rename = InitApiInfo(Api+"/api/v1/file/name", 1)
|
||||||
Trash = InitApiInfo(Api+"/api/v1/file/trash", 2)
|
Trash = InitApiInfo(Api+"/api/v1/file/trash", 2)
|
||||||
UploadCreate = InitApiInfo(Api+"/upload/v1/file/create", 2)
|
UploadCreate = InitApiInfo(Api+"/upload/v2/file/create", 2)
|
||||||
UploadUrl = InitApiInfo(Api+"/upload/v1/file/get_upload_url", 0)
|
UploadComplete = InitApiInfo(Api+"/upload/v2/file/upload_complete", 0)
|
||||||
UploadComplete = InitApiInfo(Api+"/upload/v1/file/upload_complete", 0)
|
|
||||||
UploadAsync = InitApiInfo(Api+"/upload/v1/file/upload_async_result", 1)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCallback, resp interface{}) ([]byte, error) {
|
func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCallback, resp interface{}) ([]byte, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user