mirror of
https://github.com/AlistGo/alist.git
synced 2025-11-24 19:13:09 +08:00
feat(driver/s3): Add OSS Archive Support (#9350)
* feat(s3): Add support for S3 object storage classes Introduces a new 'storage_class' configuration option for S3 providers. Users can now specify the desired storage class (e.g., Standard, GLACIER, DEEP_ARCHIVE) for objects uploaded to S3-compatible services like AWS S3 and Tencent COS. The input storage class string is normalized to match AWS SDK constants, supporting various common aliases. If an unknown storage class is provided, it will be used as a raw value with a warning. This enhancement provides greater control over storage costs and data access patterns. * feat(storage): Support for displaying file storage classes Adds storage class information to file metadata and API responses. This change introduces the ability to store file storage classes in file metadata and display them in API responses. This allows users to view a file's storage tier (e.g., S3 Standard, Glacier), enhancing data management capabilities. Implementation details include: - Introducing the StorageClassProvider interface and the ObjWrapStorageClass structure to uniformly handle and communicate object storage class information. - Updated file metadata structures (e.g., ArchiveObj, FileInfo, RespFile) to include a StorageClass field. - Modified relevant API response functions (e.g., GetFileInfo, GetFileList) to populate and return storage classes. - Integrated functionality for retrieving object storage classes from underlying storage systems (e.g., S3) and wrapping them in lists. * feat(driver/s3): Added the "Other" interface and implemented it by the S3 driver. A new `driver.Other` interface has been added and defined in the `other.go` file. The S3 driver has been updated to implement this new interface, extending its functionality. * feat(s3): Add S3 object archive and thaw task management This commit introduces comprehensive support for S3 object archive and thaw operations, managed asynchronously through a new task system. - **S3 Transition Task System**: - Adds a new `S3Transition` task configuration, including workers, max retries, and persistence options. - Initializes `S3TransitionTaskManager` to handle asynchronous S3 archive/thaw requests. - Registers dedicated API routes for monitoring S3 transition tasks. - **Integrate S3 Archive/Thaw with Other API**: - Modifies the `Other` API handler to intercept `archive` and `thaw` methods for S3 storage drivers. - Dispatches these operations as `S3TransitionTask` instances to the task manager for background processing. - Returns a task ID to the client for tracking the status of the dispatched operation. - **Refactor `other` package for improved API consistency**: - Exports previously internal structs such as `archiveRequest`, `thawRequest`, `objectDescriptor`, `archiveResponse`, `thawResponse`, and `restoreStatus` by making their names public. - Makes helper functions like `decodeOtherArgs`, `normalizeStorageClass`, and `normalizeRestoreTier` public. - Introduces new constants for various S3 `Other` API methods.
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
"github.com/alist-org/alist/v3/pkg/cron"
|
||||
"github.com/alist-org/alist/v3/server/common"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
@@ -32,6 +33,33 @@ type S3 struct {
|
||||
cron *cron.Cron
|
||||
}
|
||||
|
||||
var storageClassLookup = map[string]string{
|
||||
"standard": s3.ObjectStorageClassStandard,
|
||||
"reduced_redundancy": s3.ObjectStorageClassReducedRedundancy,
|
||||
"glacier": s3.ObjectStorageClassGlacier,
|
||||
"standard_ia": s3.ObjectStorageClassStandardIa,
|
||||
"onezone_ia": s3.ObjectStorageClassOnezoneIa,
|
||||
"intelligent_tiering": s3.ObjectStorageClassIntelligentTiering,
|
||||
"deep_archive": s3.ObjectStorageClassDeepArchive,
|
||||
"outposts": s3.ObjectStorageClassOutposts,
|
||||
"glacier_ir": s3.ObjectStorageClassGlacierIr,
|
||||
"snow": s3.ObjectStorageClassSnow,
|
||||
"express_onezone": s3.ObjectStorageClassExpressOnezone,
|
||||
}
|
||||
|
||||
func (d *S3) resolveStorageClass() *string {
|
||||
value := strings.TrimSpace(d.StorageClass)
|
||||
if value == "" {
|
||||
return nil
|
||||
}
|
||||
normalized := strings.ToLower(strings.ReplaceAll(value, "-", "_"))
|
||||
if v, ok := storageClassLookup[normalized]; ok {
|
||||
return aws.String(v)
|
||||
}
|
||||
log.Warnf("s3: unknown storage class %q, using raw value", d.StorageClass)
|
||||
return aws.String(value)
|
||||
}
|
||||
|
||||
func (d *S3) Config() driver.Config {
|
||||
return d.config
|
||||
}
|
||||
@@ -179,8 +207,14 @@ func (d *S3) Put(ctx context.Context, dstDir model.Obj, s model.FileStreamer, up
|
||||
}),
|
||||
ContentType: &contentType,
|
||||
}
|
||||
if storageClass := d.resolveStorageClass(); storageClass != nil {
|
||||
input.StorageClass = storageClass
|
||||
}
|
||||
_, err := uploader.UploadWithContext(ctx, input)
|
||||
return err
|
||||
}
|
||||
|
||||
var _ driver.Driver = (*S3)(nil)
|
||||
var (
|
||||
_ driver.Driver = (*S3)(nil)
|
||||
_ driver.Other = (*S3)(nil)
|
||||
)
|
||||
|
||||
@@ -21,6 +21,7 @@ type Addition struct {
|
||||
ListObjectVersion string `json:"list_object_version" type:"select" options:"v1,v2" default:"v1"`
|
||||
RemoveBucket bool `json:"remove_bucket" help:"Remove bucket name from path when using custom host."`
|
||||
AddFilenameToDisposition bool `json:"add_filename_to_disposition" help:"Add filename to Content-Disposition header."`
|
||||
StorageClass string `json:"storage_class" type:"select" options:",standard,standard_ia,onezone_ia,intelligent_tiering,glacier,glacier_ir,deep_archive,archive" help:"Storage class for new objects. AWS and Tencent COS support different subsets (COS uses ARCHIVE/DEEP_ARCHIVE)."`
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
286
drivers/s3/other.go
Normal file
286
drivers/s3/other.go
Normal file
@@ -0,0 +1,286 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
const (
|
||||
OtherMethodArchive = "archive"
|
||||
OtherMethodArchiveStatus = "archive_status"
|
||||
OtherMethodThaw = "thaw"
|
||||
OtherMethodThawStatus = "thaw_status"
|
||||
)
|
||||
|
||||
type ArchiveRequest struct {
|
||||
StorageClass string `json:"storage_class"`
|
||||
}
|
||||
|
||||
type ThawRequest struct {
|
||||
Days int64 `json:"days"`
|
||||
Tier string `json:"tier"`
|
||||
}
|
||||
|
||||
type ObjectDescriptor struct {
|
||||
Path string `json:"path"`
|
||||
Bucket string `json:"bucket"`
|
||||
Key string `json:"key"`
|
||||
}
|
||||
|
||||
type ArchiveResponse struct {
|
||||
Action string `json:"action"`
|
||||
Object ObjectDescriptor `json:"object"`
|
||||
StorageClass string `json:"storage_class"`
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
VersionID string `json:"version_id,omitempty"`
|
||||
ETag string `json:"etag,omitempty"`
|
||||
LastModified string `json:"last_modified,omitempty"`
|
||||
}
|
||||
|
||||
type ThawResponse struct {
|
||||
Action string `json:"action"`
|
||||
Object ObjectDescriptor `json:"object"`
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
Status *RestoreStatus `json:"status,omitempty"`
|
||||
}
|
||||
|
||||
type RestoreStatus struct {
|
||||
Ongoing bool `json:"ongoing"`
|
||||
Expiry string `json:"expiry,omitempty"`
|
||||
Raw string `json:"raw"`
|
||||
}
|
||||
|
||||
func (d *S3) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) {
|
||||
if args.Obj == nil {
|
||||
return nil, fmt.Errorf("missing object reference")
|
||||
}
|
||||
if args.Obj.IsDir() {
|
||||
return nil, errs.NotSupport
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(args.Method)) {
|
||||
case "archive":
|
||||
return d.archive(ctx, args)
|
||||
case "archive_status":
|
||||
return d.archiveStatus(ctx, args)
|
||||
case "thaw":
|
||||
return d.thaw(ctx, args)
|
||||
case "thaw_status":
|
||||
return d.thawStatus(ctx, args)
|
||||
default:
|
||||
return nil, errs.NotSupport
|
||||
}
|
||||
}
|
||||
|
||||
func (d *S3) archive(ctx context.Context, args model.OtherArgs) (interface{}, error) {
|
||||
key := getKey(args.Obj.GetPath(), false)
|
||||
payload := ArchiveRequest{}
|
||||
if err := DecodeOtherArgs(args.Data, &payload); err != nil {
|
||||
return nil, fmt.Errorf("parse archive request: %w", err)
|
||||
}
|
||||
if payload.StorageClass == "" {
|
||||
return nil, fmt.Errorf("storage_class is required")
|
||||
}
|
||||
storageClass := NormalizeStorageClass(payload.StorageClass)
|
||||
input := &s3.CopyObjectInput{
|
||||
Bucket: &d.Bucket,
|
||||
Key: &key,
|
||||
CopySource: aws.String(url.PathEscape(d.Bucket + "/" + key)),
|
||||
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
||||
StorageClass: aws.String(storageClass),
|
||||
}
|
||||
copyReq, output := d.client.CopyObjectRequest(input)
|
||||
copyReq.SetContext(ctx)
|
||||
if err := copyReq.Send(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := ArchiveResponse{
|
||||
Action: "archive",
|
||||
Object: d.describeObject(args.Obj, key),
|
||||
StorageClass: storageClass,
|
||||
RequestID: copyReq.RequestID,
|
||||
}
|
||||
if output.VersionId != nil {
|
||||
resp.VersionID = aws.StringValue(output.VersionId)
|
||||
}
|
||||
if result := output.CopyObjectResult; result != nil {
|
||||
resp.ETag = aws.StringValue(result.ETag)
|
||||
if result.LastModified != nil {
|
||||
resp.LastModified = result.LastModified.UTC().Format(time.RFC3339)
|
||||
}
|
||||
}
|
||||
if status, err := d.describeObjectStatus(ctx, key); err == nil {
|
||||
if status.StorageClass != "" {
|
||||
resp.StorageClass = status.StorageClass
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (d *S3) archiveStatus(ctx context.Context, args model.OtherArgs) (interface{}, error) {
|
||||
key := getKey(args.Obj.GetPath(), false)
|
||||
status, err := d.describeObjectStatus(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ArchiveResponse{
|
||||
Action: "archive_status",
|
||||
Object: d.describeObject(args.Obj, key),
|
||||
StorageClass: status.StorageClass,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error) {
|
||||
key := getKey(args.Obj.GetPath(), false)
|
||||
payload := ThawRequest{Days: 1}
|
||||
if err := DecodeOtherArgs(args.Data, &payload); err != nil {
|
||||
return nil, fmt.Errorf("parse thaw request: %w", err)
|
||||
}
|
||||
if payload.Days <= 0 {
|
||||
payload.Days = 1
|
||||
}
|
||||
restoreRequest := &s3.RestoreRequest{
|
||||
Days: aws.Int64(payload.Days),
|
||||
}
|
||||
if tier := NormalizeRestoreTier(payload.Tier); tier != "" {
|
||||
restoreRequest.GlacierJobParameters = &s3.GlacierJobParameters{Tier: aws.String(tier)}
|
||||
}
|
||||
input := &s3.RestoreObjectInput{
|
||||
Bucket: &d.Bucket,
|
||||
Key: &key,
|
||||
RestoreRequest: restoreRequest,
|
||||
}
|
||||
restoreReq, _ := d.client.RestoreObjectRequest(input)
|
||||
restoreReq.SetContext(ctx)
|
||||
if err := restoreReq.Send(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
status, _ := d.describeObjectStatus(ctx, key)
|
||||
resp := ThawResponse{
|
||||
Action: "thaw",
|
||||
Object: d.describeObject(args.Obj, key),
|
||||
RequestID: restoreReq.RequestID,
|
||||
}
|
||||
if status != nil {
|
||||
resp.Status = status.Restore
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (d *S3) thawStatus(ctx context.Context, args model.OtherArgs) (interface{}, error) {
|
||||
key := getKey(args.Obj.GetPath(), false)
|
||||
status, err := d.describeObjectStatus(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ThawResponse{
|
||||
Action: "thaw_status",
|
||||
Object: d.describeObject(args.Obj, key),
|
||||
Status: status.Restore,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *S3) describeObject(obj model.Obj, key string) ObjectDescriptor {
|
||||
return ObjectDescriptor{
|
||||
Path: obj.GetPath(),
|
||||
Bucket: d.Bucket,
|
||||
Key: key,
|
||||
}
|
||||
}
|
||||
|
||||
type objectStatus struct {
|
||||
StorageClass string
|
||||
Restore *RestoreStatus
|
||||
}
|
||||
|
||||
func (d *S3) describeObjectStatus(ctx context.Context, key string) (*objectStatus, error) {
|
||||
head, err := d.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{Bucket: &d.Bucket, Key: &key})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
status := &objectStatus{
|
||||
StorageClass: aws.StringValue(head.StorageClass),
|
||||
Restore: parseRestoreHeader(head.Restore),
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func parseRestoreHeader(header *string) *RestoreStatus {
|
||||
if header == nil {
|
||||
return nil
|
||||
}
|
||||
value := strings.TrimSpace(*header)
|
||||
if value == "" {
|
||||
return nil
|
||||
}
|
||||
status := &RestoreStatus{Raw: value}
|
||||
parts := strings.Split(value, ",")
|
||||
for _, part := range parts {
|
||||
part = strings.TrimSpace(part)
|
||||
if part == "" {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(part, "ongoing-request=") {
|
||||
status.Ongoing = strings.Contains(part, "\"true\"")
|
||||
}
|
||||
if strings.HasPrefix(part, "expiry-date=") {
|
||||
expiry := strings.Trim(part[len("expiry-date="):], "\"")
|
||||
if expiry != "" {
|
||||
if t, err := time.Parse(time.RFC1123, expiry); err == nil {
|
||||
status.Expiry = t.UTC().Format(time.RFC3339)
|
||||
} else {
|
||||
status.Expiry = expiry
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func DecodeOtherArgs(data interface{}, target interface{}) error {
|
||||
if data == nil {
|
||||
return nil
|
||||
}
|
||||
raw, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return json.Unmarshal(raw, target)
|
||||
}
|
||||
|
||||
func NormalizeStorageClass(value string) string {
|
||||
normalized := strings.ToLower(strings.TrimSpace(strings.ReplaceAll(value, "-", "_")))
|
||||
if normalized == "" {
|
||||
return value
|
||||
}
|
||||
if v, ok := storageClassLookup[normalized]; ok {
|
||||
return v
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func NormalizeRestoreTier(value string) string {
|
||||
normalized := strings.ToLower(strings.TrimSpace(value))
|
||||
switch normalized {
|
||||
case "", "default":
|
||||
return ""
|
||||
case "bulk":
|
||||
return s3.TierBulk
|
||||
case "standard":
|
||||
return s3.TierStandard
|
||||
case "expedited":
|
||||
return s3.TierExpedited
|
||||
default:
|
||||
return value
|
||||
}
|
||||
}
|
||||
@@ -109,13 +109,13 @@ func (d *S3) listV1(prefix string, args model.ListArgs) ([]model.Obj, error) {
|
||||
if !args.S3ShowPlaceholder && (name == getPlaceholderName(d.Placeholder) || name == d.Placeholder) {
|
||||
continue
|
||||
}
|
||||
file := model.Object{
|
||||
file := &model.Object{
|
||||
//Id: *object.Key,
|
||||
Name: name,
|
||||
Size: *object.Size,
|
||||
Modified: *object.LastModified,
|
||||
}
|
||||
files = append(files, &file)
|
||||
files = append(files, model.WrapObjStorageClass(file, aws.StringValue(object.StorageClass)))
|
||||
}
|
||||
if listObjectsResult.IsTruncated == nil {
|
||||
return nil, errors.New("IsTruncated nil")
|
||||
@@ -164,13 +164,13 @@ func (d *S3) listV2(prefix string, args model.ListArgs) ([]model.Obj, error) {
|
||||
if !args.S3ShowPlaceholder && (name == getPlaceholderName(d.Placeholder) || name == d.Placeholder) {
|
||||
continue
|
||||
}
|
||||
file := model.Object{
|
||||
file := &model.Object{
|
||||
//Id: *object.Key,
|
||||
Name: name,
|
||||
Size: *object.Size,
|
||||
Modified: *object.LastModified,
|
||||
}
|
||||
files = append(files, &file)
|
||||
files = append(files, model.WrapObjStorageClass(file, aws.StringValue(object.StorageClass)))
|
||||
}
|
||||
if !aws.BoolValue(listObjectsResult.IsTruncated) {
|
||||
break
|
||||
@@ -202,6 +202,9 @@ func (d *S3) copyFile(ctx context.Context, src string, dst string) error {
|
||||
CopySource: aws.String(url.PathEscape(d.Bucket + "/" + srcKey)),
|
||||
Key: &dstKey,
|
||||
}
|
||||
if storageClass := d.resolveStorageClass(); storageClass != nil {
|
||||
input.StorageClass = storageClass
|
||||
}
|
||||
_, err := d.client.CopyObject(input)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -37,6 +37,18 @@ func InitTaskManager() {
|
||||
if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted
|
||||
CleanTempDir()
|
||||
}
|
||||
workers := conf.Conf.Tasks.S3Transition.Workers
|
||||
if workers < 0 {
|
||||
workers = 0
|
||||
}
|
||||
fs.S3TransitionTaskManager = tache.NewManager[*fs.S3TransitionTask](
|
||||
tache.WithWorks(workers),
|
||||
tache.WithPersistFunction(
|
||||
db.GetTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant),
|
||||
db.UpdateTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant),
|
||||
),
|
||||
tache.WithMaxRetry(conf.Conf.Tasks.S3Transition.MaxRetry),
|
||||
)
|
||||
fs.ArchiveDownloadTaskManager = tache.NewManager[*fs.ArchiveDownloadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant), db.UpdateTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Decompress.MaxRetry))
|
||||
op.RegisterSettingChangingCallback(func() {
|
||||
fs.ArchiveDownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)))
|
||||
|
||||
@@ -60,6 +60,7 @@ type TasksConfig struct {
|
||||
Copy TaskConfig `json:"copy" envPrefix:"COPY_"`
|
||||
Decompress TaskConfig `json:"decompress" envPrefix:"DECOMPRESS_"`
|
||||
DecompressUpload TaskConfig `json:"decompress_upload" envPrefix:"DECOMPRESS_UPLOAD_"`
|
||||
S3Transition TaskConfig `json:"s3_transition" envPrefix:"S3_TRANSITION_"`
|
||||
AllowRetryCanceled bool `json:"allow_retry_canceled" env:"ALLOW_RETRY_CANCELED"`
|
||||
}
|
||||
|
||||
@@ -184,6 +185,11 @@ func DefaultConfig() *Config {
|
||||
Workers: 5,
|
||||
MaxRetry: 2,
|
||||
},
|
||||
S3Transition: TaskConfig{
|
||||
Workers: 5,
|
||||
MaxRetry: 2,
|
||||
// TaskPersistant: true,
|
||||
},
|
||||
AllowRetryCanceled: false,
|
||||
},
|
||||
Cors: Cors{
|
||||
|
||||
@@ -2,10 +2,15 @@ package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
stdpath "path"
|
||||
"strings"
|
||||
|
||||
"github.com/alist-org/alist/v3/drivers/s3"
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/task"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
@@ -53,6 +58,38 @@ func other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) {
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
originalPath := args.Path
|
||||
|
||||
if _, ok := storage.(*s3.S3); ok {
|
||||
method := strings.ToLower(strings.TrimSpace(args.Method))
|
||||
if method == s3.OtherMethodArchive || method == s3.OtherMethodThaw {
|
||||
if S3TransitionTaskManager == nil {
|
||||
return nil, errors.New("s3 transition task manager is not initialized")
|
||||
}
|
||||
var payload json.RawMessage
|
||||
if args.Data != nil {
|
||||
raw, err := json.Marshal(args.Data)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed to encode request payload")
|
||||
}
|
||||
payload = raw
|
||||
}
|
||||
taskCreator, _ := ctx.Value("user").(*model.User)
|
||||
tsk := &S3TransitionTask{
|
||||
TaskExtension: task.TaskExtension{Creator: taskCreator},
|
||||
status: "queued",
|
||||
StorageMountPath: storage.GetStorage().MountPath,
|
||||
ObjectPath: actualPath,
|
||||
DisplayPath: originalPath,
|
||||
ObjectName: stdpath.Base(actualPath),
|
||||
Transition: method,
|
||||
Payload: payload,
|
||||
}
|
||||
S3TransitionTaskManager.Add(tsk)
|
||||
return map[string]string{"task_id": tsk.GetID()}, nil
|
||||
}
|
||||
}
|
||||
|
||||
args.Path = actualPath
|
||||
return op.Other(ctx, storage, args)
|
||||
}
|
||||
|
||||
310
internal/fs/s3_transition.go
Normal file
310
internal/fs/s3_transition.go
Normal file
@@ -0,0 +1,310 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alist-org/alist/v3/drivers/s3"
|
||||
"github.com/alist-org/alist/v3/internal/driver"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/task"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/xhofe/tache"
|
||||
)
|
||||
|
||||
const s3TransitionPollInterval = 15 * time.Second
|
||||
|
||||
// S3TransitionTask represents an asynchronous S3 archive/thaw request that is
|
||||
// tracked via the task manager so that clients can monitor the progress of the
|
||||
// operation.
|
||||
type S3TransitionTask struct {
|
||||
task.TaskExtension
|
||||
status string
|
||||
|
||||
StorageMountPath string `json:"storage_mount_path"`
|
||||
ObjectPath string `json:"object_path"`
|
||||
DisplayPath string `json:"display_path"`
|
||||
ObjectName string `json:"object_name"`
|
||||
Transition string `json:"transition"`
|
||||
Payload json.RawMessage `json:"payload,omitempty"`
|
||||
|
||||
TargetStorageClass string `json:"target_storage_class,omitempty"`
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
VersionID string `json:"version_id,omitempty"`
|
||||
|
||||
storage driver.Driver `json:"-"`
|
||||
}
|
||||
|
||||
// S3TransitionTaskManager holds asynchronous S3 archive/thaw tasks.
|
||||
var S3TransitionTaskManager *tache.Manager[*S3TransitionTask]
|
||||
|
||||
var _ task.TaskExtensionInfo = (*S3TransitionTask)(nil)
|
||||
|
||||
func (t *S3TransitionTask) GetName() string {
|
||||
action := strings.ToLower(t.Transition)
|
||||
if action == "" {
|
||||
action = "transition"
|
||||
}
|
||||
display := t.DisplayPath
|
||||
if display == "" {
|
||||
display = t.ObjectPath
|
||||
}
|
||||
if display == "" {
|
||||
display = t.ObjectName
|
||||
}
|
||||
return fmt.Sprintf("s3 %s %s", action, display)
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) GetStatus() string {
|
||||
return t.status
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) Run() error {
|
||||
t.ReinitCtx()
|
||||
t.ClearEndTime()
|
||||
start := time.Now()
|
||||
t.SetStartTime(start)
|
||||
defer func() { t.SetEndTime(time.Now()) }()
|
||||
|
||||
if err := t.ensureStorage(); err != nil {
|
||||
t.status = fmt.Sprintf("locate storage failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
payload, err := t.decodePayload()
|
||||
if err != nil {
|
||||
t.status = fmt.Sprintf("decode payload failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
method := strings.ToLower(strings.TrimSpace(t.Transition))
|
||||
switch method {
|
||||
case s3.OtherMethodArchive:
|
||||
t.status = "submitting archive request"
|
||||
t.SetProgress(0)
|
||||
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
|
||||
Path: t.ObjectPath,
|
||||
Method: s3.OtherMethodArchive,
|
||||
Data: payload,
|
||||
})
|
||||
if err != nil {
|
||||
t.status = fmt.Sprintf("archive request failed: %v", err)
|
||||
return err
|
||||
}
|
||||
archiveResp, ok := toArchiveResponse(resp)
|
||||
if ok {
|
||||
if t.TargetStorageClass == "" {
|
||||
t.TargetStorageClass = archiveResp.StorageClass
|
||||
}
|
||||
t.RequestID = archiveResp.RequestID
|
||||
t.VersionID = archiveResp.VersionID
|
||||
if archiveResp.StorageClass != "" {
|
||||
t.status = fmt.Sprintf("archive requested, waiting for %s", archiveResp.StorageClass)
|
||||
} else {
|
||||
t.status = "archive requested"
|
||||
}
|
||||
} else if sc := t.extractTargetStorageClass(); sc != "" {
|
||||
t.TargetStorageClass = sc
|
||||
t.status = fmt.Sprintf("archive requested, waiting for %s", sc)
|
||||
} else {
|
||||
t.status = "archive requested"
|
||||
}
|
||||
if t.TargetStorageClass != "" {
|
||||
t.TargetStorageClass = s3.NormalizeStorageClass(t.TargetStorageClass)
|
||||
}
|
||||
t.SetProgress(25)
|
||||
return t.waitForArchive()
|
||||
case s3.OtherMethodThaw:
|
||||
t.status = "submitting thaw request"
|
||||
t.SetProgress(0)
|
||||
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
|
||||
Path: t.ObjectPath,
|
||||
Method: s3.OtherMethodThaw,
|
||||
Data: payload,
|
||||
})
|
||||
if err != nil {
|
||||
t.status = fmt.Sprintf("thaw request failed: %v", err)
|
||||
return err
|
||||
}
|
||||
thawResp, ok := toThawResponse(resp)
|
||||
if ok {
|
||||
t.RequestID = thawResp.RequestID
|
||||
if thawResp.Status != nil && !thawResp.Status.Ongoing {
|
||||
t.SetProgress(100)
|
||||
t.status = thawCompletionMessage(thawResp.Status)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
t.status = "thaw requested"
|
||||
t.SetProgress(25)
|
||||
return t.waitForThaw()
|
||||
default:
|
||||
return errors.Errorf("unsupported transition method: %s", t.Transition)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) ensureStorage() error {
|
||||
if t.storage != nil {
|
||||
return nil
|
||||
}
|
||||
storage, err := op.GetStorageByMountPath(t.StorageMountPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.storage = storage
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) decodePayload() (interface{}, error) {
|
||||
if len(t.Payload) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var payload interface{}
|
||||
if err := json.Unmarshal(t.Payload, &payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) extractTargetStorageClass() string {
|
||||
if len(t.Payload) == 0 {
|
||||
return ""
|
||||
}
|
||||
var req s3.ArchiveRequest
|
||||
if err := json.Unmarshal(t.Payload, &req); err != nil {
|
||||
return ""
|
||||
}
|
||||
return s3.NormalizeStorageClass(req.StorageClass)
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) waitForArchive() error {
|
||||
ticker := time.NewTicker(s3TransitionPollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
ctx := t.Ctx()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.status = "archive canceled"
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
|
||||
Path: t.ObjectPath,
|
||||
Method: s3.OtherMethodArchiveStatus,
|
||||
})
|
||||
if err != nil {
|
||||
t.status = fmt.Sprintf("archive status error: %v", err)
|
||||
return err
|
||||
}
|
||||
archiveResp, ok := toArchiveResponse(resp)
|
||||
if !ok {
|
||||
t.status = fmt.Sprintf("unexpected archive status response: %T", resp)
|
||||
return errors.Errorf("unexpected archive status response: %T", resp)
|
||||
}
|
||||
currentClass := strings.TrimSpace(archiveResp.StorageClass)
|
||||
target := strings.TrimSpace(t.TargetStorageClass)
|
||||
if target == "" {
|
||||
target = currentClass
|
||||
t.TargetStorageClass = currentClass
|
||||
}
|
||||
if currentClass == "" {
|
||||
t.status = "waiting for storage class update"
|
||||
t.SetProgress(50)
|
||||
continue
|
||||
}
|
||||
if strings.EqualFold(currentClass, target) {
|
||||
t.SetProgress(100)
|
||||
t.status = fmt.Sprintf("archive complete (%s)", currentClass)
|
||||
return nil
|
||||
}
|
||||
t.status = fmt.Sprintf("storage class %s (target %s)", currentClass, target)
|
||||
t.SetProgress(75)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *S3TransitionTask) waitForThaw() error {
|
||||
ticker := time.NewTicker(s3TransitionPollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
ctx := t.Ctx()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.status = "thaw canceled"
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
|
||||
Path: t.ObjectPath,
|
||||
Method: s3.OtherMethodThawStatus,
|
||||
})
|
||||
if err != nil {
|
||||
t.status = fmt.Sprintf("thaw status error: %v", err)
|
||||
return err
|
||||
}
|
||||
thawResp, ok := toThawResponse(resp)
|
||||
if !ok {
|
||||
t.status = fmt.Sprintf("unexpected thaw status response: %T", resp)
|
||||
return errors.Errorf("unexpected thaw status response: %T", resp)
|
||||
}
|
||||
status := thawResp.Status
|
||||
if status == nil {
|
||||
t.status = "waiting for thaw status"
|
||||
t.SetProgress(50)
|
||||
continue
|
||||
}
|
||||
if status.Ongoing {
|
||||
t.status = fmt.Sprintf("thaw in progress (%s)", status.Raw)
|
||||
t.SetProgress(75)
|
||||
continue
|
||||
}
|
||||
t.SetProgress(100)
|
||||
t.status = thawCompletionMessage(status)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func thawCompletionMessage(status *s3.RestoreStatus) string {
|
||||
if status == nil {
|
||||
return "thaw complete"
|
||||
}
|
||||
if status.Expiry != "" {
|
||||
return fmt.Sprintf("thaw complete, expires %s", status.Expiry)
|
||||
}
|
||||
return "thaw complete"
|
||||
}
|
||||
|
||||
func toArchiveResponse(v interface{}) (s3.ArchiveResponse, bool) {
|
||||
switch resp := v.(type) {
|
||||
case s3.ArchiveResponse:
|
||||
return resp, true
|
||||
case *s3.ArchiveResponse:
|
||||
if resp != nil {
|
||||
return *resp, true
|
||||
}
|
||||
}
|
||||
return s3.ArchiveResponse{}, false
|
||||
}
|
||||
|
||||
func toThawResponse(v interface{}) (s3.ThawResponse, bool) {
|
||||
switch resp := v.(type) {
|
||||
case s3.ThawResponse:
|
||||
return resp, true
|
||||
case *s3.ThawResponse:
|
||||
if resp != nil {
|
||||
return *resp, true
|
||||
}
|
||||
}
|
||||
return s3.ThawResponse{}, false
|
||||
}
|
||||
|
||||
// Ensure compatibility with persistence when tasks are restored.
|
||||
func (t *S3TransitionTask) OnRestore() {
|
||||
// The storage handle is not persisted intentionally; it will be lazily
|
||||
// re-fetched on the next Run invocation.
|
||||
t.storage = nil
|
||||
}
|
||||
@@ -20,6 +20,10 @@ type ObjUnwrap interface {
|
||||
Unwrap() Obj
|
||||
}
|
||||
|
||||
type StorageClassProvider interface {
|
||||
StorageClass() string
|
||||
}
|
||||
|
||||
type Obj interface {
|
||||
GetSize() int64
|
||||
GetName() string
|
||||
@@ -141,6 +145,13 @@ func WrapObjsName(objs []Obj) {
|
||||
}
|
||||
}
|
||||
|
||||
func WrapObjStorageClass(obj Obj, storageClass string) Obj {
|
||||
if storageClass == "" {
|
||||
return obj
|
||||
}
|
||||
return &ObjWrapStorageClass{Obj: obj, storageClass: storageClass}
|
||||
}
|
||||
|
||||
func UnwrapObj(obj Obj) Obj {
|
||||
if unwrap, ok := obj.(ObjUnwrap); ok {
|
||||
obj = unwrap.Unwrap()
|
||||
@@ -168,6 +179,20 @@ func GetUrl(obj Obj) (url string, ok bool) {
|
||||
return url, false
|
||||
}
|
||||
|
||||
func GetStorageClass(obj Obj) (string, bool) {
|
||||
if provider, ok := obj.(StorageClassProvider); ok {
|
||||
value := provider.StorageClass()
|
||||
if value == "" {
|
||||
return "", false
|
||||
}
|
||||
return value, true
|
||||
}
|
||||
if unwrap, ok := obj.(ObjUnwrap); ok {
|
||||
return GetStorageClass(unwrap.Unwrap())
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func GetRawObject(obj Obj) *Object {
|
||||
switch v := obj.(type) {
|
||||
case *ObjThumbURL:
|
||||
|
||||
@@ -11,6 +11,11 @@ type ObjWrapName struct {
|
||||
Obj
|
||||
}
|
||||
|
||||
type ObjWrapStorageClass struct {
|
||||
storageClass string
|
||||
Obj
|
||||
}
|
||||
|
||||
func (o *ObjWrapName) Unwrap() Obj {
|
||||
return o.Obj
|
||||
}
|
||||
@@ -19,6 +24,20 @@ func (o *ObjWrapName) GetName() string {
|
||||
return o.Name
|
||||
}
|
||||
|
||||
func (o *ObjWrapStorageClass) Unwrap() Obj {
|
||||
return o.Obj
|
||||
}
|
||||
|
||||
func (o *ObjWrapStorageClass) StorageClass() string {
|
||||
return o.storageClass
|
||||
}
|
||||
|
||||
func (o *ObjWrapStorageClass) SetPath(path string) {
|
||||
if setter, ok := o.Obj.(SetPath); ok {
|
||||
setter.SetPath(path)
|
||||
}
|
||||
}
|
||||
|
||||
type Object struct {
|
||||
ID string
|
||||
Path string
|
||||
|
||||
@@ -44,17 +44,19 @@ type ArchiveContentResp struct {
|
||||
}
|
||||
|
||||
func toObjsRespWithoutSignAndThumb(obj model.Obj) ObjResp {
|
||||
storageClass, _ := model.GetStorageClass(obj)
|
||||
return ObjResp{
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsDir: obj.IsDir(),
|
||||
Modified: obj.ModTime(),
|
||||
Created: obj.CreateTime(),
|
||||
HashInfoStr: obj.GetHash().String(),
|
||||
HashInfo: obj.GetHash().Export(),
|
||||
Sign: "",
|
||||
Thumb: "",
|
||||
Type: utils.GetObjType(obj.GetName(), obj.IsDir()),
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsDir: obj.IsDir(),
|
||||
Modified: obj.ModTime(),
|
||||
Created: obj.CreateTime(),
|
||||
HashInfoStr: obj.GetHash().String(),
|
||||
HashInfo: obj.GetHash().Export(),
|
||||
Sign: "",
|
||||
Thumb: "",
|
||||
Type: utils.GetObjType(obj.GetName(), obj.IsDir()),
|
||||
StorageClass: storageClass,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,18 +33,19 @@ type DirReq struct {
|
||||
}
|
||||
|
||||
type ObjResp struct {
|
||||
Id string `json:"id"`
|
||||
Path string `json:"path"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
IsDir bool `json:"is_dir"`
|
||||
Modified time.Time `json:"modified"`
|
||||
Created time.Time `json:"created"`
|
||||
Sign string `json:"sign"`
|
||||
Thumb string `json:"thumb"`
|
||||
Type int `json:"type"`
|
||||
HashInfoStr string `json:"hashinfo"`
|
||||
HashInfo map[*utils.HashType]string `json:"hash_info"`
|
||||
Id string `json:"id"`
|
||||
Path string `json:"path"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
IsDir bool `json:"is_dir"`
|
||||
Modified time.Time `json:"modified"`
|
||||
Created time.Time `json:"created"`
|
||||
Sign string `json:"sign"`
|
||||
Thumb string `json:"thumb"`
|
||||
Type int `json:"type"`
|
||||
HashInfoStr string `json:"hashinfo"`
|
||||
HashInfo map[*utils.HashType]string `json:"hash_info"`
|
||||
StorageClass string `json:"storage_class,omitempty"`
|
||||
}
|
||||
|
||||
type FsListResp struct {
|
||||
@@ -57,19 +58,20 @@ type FsListResp struct {
|
||||
}
|
||||
|
||||
type ObjLabelResp struct {
|
||||
Id string `json:"id"`
|
||||
Path string `json:"path"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
IsDir bool `json:"is_dir"`
|
||||
Modified time.Time `json:"modified"`
|
||||
Created time.Time `json:"created"`
|
||||
Sign string `json:"sign"`
|
||||
Thumb string `json:"thumb"`
|
||||
Type int `json:"type"`
|
||||
HashInfoStr string `json:"hashinfo"`
|
||||
HashInfo map[*utils.HashType]string `json:"hash_info"`
|
||||
LabelList []model.Label `json:"label_list"`
|
||||
Id string `json:"id"`
|
||||
Path string `json:"path"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
IsDir bool `json:"is_dir"`
|
||||
Modified time.Time `json:"modified"`
|
||||
Created time.Time `json:"created"`
|
||||
Sign string `json:"sign"`
|
||||
Thumb string `json:"thumb"`
|
||||
Type int `json:"type"`
|
||||
HashInfoStr string `json:"hashinfo"`
|
||||
HashInfo map[*utils.HashType]string `json:"hash_info"`
|
||||
LabelList []model.Label `json:"label_list"`
|
||||
StorageClass string `json:"storage_class,omitempty"`
|
||||
}
|
||||
|
||||
func FsList(c *gin.Context) {
|
||||
@@ -256,20 +258,22 @@ func toObjsResp(objs []model.Obj, parent string, encrypt bool) []ObjLabelResp {
|
||||
labels = labelsByName[obj.GetName()]
|
||||
}
|
||||
thumb, _ := model.GetThumb(obj)
|
||||
storageClass, _ := model.GetStorageClass(obj)
|
||||
resp = append(resp, ObjLabelResp{
|
||||
Id: obj.GetID(),
|
||||
Path: obj.GetPath(),
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsDir: obj.IsDir(),
|
||||
Modified: obj.ModTime(),
|
||||
Created: obj.CreateTime(),
|
||||
HashInfoStr: obj.GetHash().String(),
|
||||
HashInfo: obj.GetHash().Export(),
|
||||
Sign: common.Sign(obj, parent, encrypt),
|
||||
Thumb: thumb,
|
||||
Type: utils.GetObjType(obj.GetName(), obj.IsDir()),
|
||||
LabelList: labels,
|
||||
Id: obj.GetID(),
|
||||
Path: obj.GetPath(),
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsDir: obj.IsDir(),
|
||||
Modified: obj.ModTime(),
|
||||
Created: obj.CreateTime(),
|
||||
HashInfoStr: obj.GetHash().String(),
|
||||
HashInfo: obj.GetHash().Export(),
|
||||
Sign: common.Sign(obj, parent, encrypt),
|
||||
Thumb: thumb,
|
||||
Type: utils.GetObjType(obj.GetName(), obj.IsDir()),
|
||||
LabelList: labels,
|
||||
StorageClass: storageClass,
|
||||
})
|
||||
}
|
||||
return resp
|
||||
@@ -374,20 +378,22 @@ func FsGet(c *gin.Context) {
|
||||
}
|
||||
parentMeta, _ := op.GetNearestMeta(parentPath)
|
||||
thumb, _ := model.GetThumb(obj)
|
||||
storageClass, _ := model.GetStorageClass(obj)
|
||||
common.SuccessResp(c, FsGetResp{
|
||||
ObjResp: ObjResp{
|
||||
Id: obj.GetID(),
|
||||
Path: obj.GetPath(),
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsDir: obj.IsDir(),
|
||||
Modified: obj.ModTime(),
|
||||
Created: obj.CreateTime(),
|
||||
HashInfoStr: obj.GetHash().String(),
|
||||
HashInfo: obj.GetHash().Export(),
|
||||
Sign: common.Sign(obj, parentPath, isEncrypt(meta, reqPath)),
|
||||
Type: utils.GetFileType(obj.GetName()),
|
||||
Thumb: thumb,
|
||||
Id: obj.GetID(),
|
||||
Path: obj.GetPath(),
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsDir: obj.IsDir(),
|
||||
Modified: obj.ModTime(),
|
||||
Created: obj.CreateTime(),
|
||||
HashInfoStr: obj.GetHash().String(),
|
||||
HashInfo: obj.GetHash().Export(),
|
||||
Sign: common.Sign(obj, parentPath, isEncrypt(meta, reqPath)),
|
||||
Type: utils.GetFileType(obj.GetName()),
|
||||
Thumb: thumb,
|
||||
StorageClass: storageClass,
|
||||
},
|
||||
RawURL: rawURL,
|
||||
Readme: getReadme(meta, reqPath),
|
||||
|
||||
@@ -220,6 +220,7 @@ func SetupTaskRoute(g *gin.RouterGroup) {
|
||||
taskRoute(g.Group("/copy"), fs.CopyTaskManager)
|
||||
taskRoute(g.Group("/offline_download"), tool.DownloadTaskManager)
|
||||
taskRoute(g.Group("/offline_download_transfer"), tool.TransferTaskManager)
|
||||
taskRoute(g.Group("/s3_transition"), fs.S3TransitionTaskManager)
|
||||
taskRoute(g.Group("/decompress"), fs.ArchiveDownloadTaskManager)
|
||||
taskRoute(g.Group("/decompress_upload"), fs.ArchiveContentUploadTaskManager)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user