diff --git a/drivers/s3/driver.go b/drivers/s3/driver.go index b7411489..7825ca6f 100644 --- a/drivers/s3/driver.go +++ b/drivers/s3/driver.go @@ -15,6 +15,7 @@ import ( "github.com/alist-org/alist/v3/internal/stream" "github.com/alist-org/alist/v3/pkg/cron" "github.com/alist-org/alist/v3/server/common" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -32,6 +33,33 @@ type S3 struct { cron *cron.Cron } +var storageClassLookup = map[string]string{ + "standard": s3.ObjectStorageClassStandard, + "reduced_redundancy": s3.ObjectStorageClassReducedRedundancy, + "glacier": s3.ObjectStorageClassGlacier, + "standard_ia": s3.ObjectStorageClassStandardIa, + "onezone_ia": s3.ObjectStorageClassOnezoneIa, + "intelligent_tiering": s3.ObjectStorageClassIntelligentTiering, + "deep_archive": s3.ObjectStorageClassDeepArchive, + "outposts": s3.ObjectStorageClassOutposts, + "glacier_ir": s3.ObjectStorageClassGlacierIr, + "snow": s3.ObjectStorageClassSnow, + "express_onezone": s3.ObjectStorageClassExpressOnezone, +} + +func (d *S3) resolveStorageClass() *string { + value := strings.TrimSpace(d.StorageClass) + if value == "" { + return nil + } + normalized := strings.ToLower(strings.ReplaceAll(value, "-", "_")) + if v, ok := storageClassLookup[normalized]; ok { + return aws.String(v) + } + log.Warnf("s3: unknown storage class %q, using raw value", d.StorageClass) + return aws.String(value) +} + func (d *S3) Config() driver.Config { return d.config } @@ -179,8 +207,14 @@ func (d *S3) Put(ctx context.Context, dstDir model.Obj, s model.FileStreamer, up }), ContentType: &contentType, } + if storageClass := d.resolveStorageClass(); storageClass != nil { + input.StorageClass = storageClass + } _, err := uploader.UploadWithContext(ctx, input) return err } -var _ driver.Driver = (*S3)(nil) +var ( + _ driver.Driver = (*S3)(nil) + _ driver.Other = (*S3)(nil) +) diff --git a/drivers/s3/meta.go b/drivers/s3/meta.go index 4de4b60a..89d723b6 100644 --- a/drivers/s3/meta.go +++ b/drivers/s3/meta.go @@ -21,6 +21,7 @@ type Addition struct { ListObjectVersion string `json:"list_object_version" type:"select" options:"v1,v2" default:"v1"` RemoveBucket bool `json:"remove_bucket" help:"Remove bucket name from path when using custom host."` AddFilenameToDisposition bool `json:"add_filename_to_disposition" help:"Add filename to Content-Disposition header."` + StorageClass string `json:"storage_class" type:"select" options:",standard,standard_ia,onezone_ia,intelligent_tiering,glacier,glacier_ir,deep_archive,archive" help:"Storage class for new objects. AWS and Tencent COS support different subsets (COS uses ARCHIVE/DEEP_ARCHIVE)."` } func init() { diff --git a/drivers/s3/other.go b/drivers/s3/other.go new file mode 100644 index 00000000..e299dae0 --- /dev/null +++ b/drivers/s3/other.go @@ -0,0 +1,286 @@ +package s3 + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + "time" + + "github.com/alist-org/alist/v3/internal/errs" + "github.com/alist-org/alist/v3/internal/model" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" +) + +const ( + OtherMethodArchive = "archive" + OtherMethodArchiveStatus = "archive_status" + OtherMethodThaw = "thaw" + OtherMethodThawStatus = "thaw_status" +) + +type ArchiveRequest struct { + StorageClass string `json:"storage_class"` +} + +type ThawRequest struct { + Days int64 `json:"days"` + Tier string `json:"tier"` +} + +type ObjectDescriptor struct { + Path string `json:"path"` + Bucket string `json:"bucket"` + Key string `json:"key"` +} + +type ArchiveResponse struct { + Action string `json:"action"` + Object ObjectDescriptor `json:"object"` + StorageClass string `json:"storage_class"` + RequestID string `json:"request_id,omitempty"` + VersionID string `json:"version_id,omitempty"` + ETag string `json:"etag,omitempty"` + LastModified string `json:"last_modified,omitempty"` +} + +type ThawResponse struct { + Action string `json:"action"` + Object ObjectDescriptor `json:"object"` + RequestID string `json:"request_id,omitempty"` + Status *RestoreStatus `json:"status,omitempty"` +} + +type RestoreStatus struct { + Ongoing bool `json:"ongoing"` + Expiry string `json:"expiry,omitempty"` + Raw string `json:"raw"` +} + +func (d *S3) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) { + if args.Obj == nil { + return nil, fmt.Errorf("missing object reference") + } + if args.Obj.IsDir() { + return nil, errs.NotSupport + } + + switch strings.ToLower(strings.TrimSpace(args.Method)) { + case "archive": + return d.archive(ctx, args) + case "archive_status": + return d.archiveStatus(ctx, args) + case "thaw": + return d.thaw(ctx, args) + case "thaw_status": + return d.thawStatus(ctx, args) + default: + return nil, errs.NotSupport + } +} + +func (d *S3) archive(ctx context.Context, args model.OtherArgs) (interface{}, error) { + key := getKey(args.Obj.GetPath(), false) + payload := ArchiveRequest{} + if err := DecodeOtherArgs(args.Data, &payload); err != nil { + return nil, fmt.Errorf("parse archive request: %w", err) + } + if payload.StorageClass == "" { + return nil, fmt.Errorf("storage_class is required") + } + storageClass := NormalizeStorageClass(payload.StorageClass) + input := &s3.CopyObjectInput{ + Bucket: &d.Bucket, + Key: &key, + CopySource: aws.String(url.PathEscape(d.Bucket + "/" + key)), + MetadataDirective: aws.String(s3.MetadataDirectiveCopy), + StorageClass: aws.String(storageClass), + } + copyReq, output := d.client.CopyObjectRequest(input) + copyReq.SetContext(ctx) + if err := copyReq.Send(); err != nil { + return nil, err + } + + resp := ArchiveResponse{ + Action: "archive", + Object: d.describeObject(args.Obj, key), + StorageClass: storageClass, + RequestID: copyReq.RequestID, + } + if output.VersionId != nil { + resp.VersionID = aws.StringValue(output.VersionId) + } + if result := output.CopyObjectResult; result != nil { + resp.ETag = aws.StringValue(result.ETag) + if result.LastModified != nil { + resp.LastModified = result.LastModified.UTC().Format(time.RFC3339) + } + } + if status, err := d.describeObjectStatus(ctx, key); err == nil { + if status.StorageClass != "" { + resp.StorageClass = status.StorageClass + } + } + return resp, nil +} + +func (d *S3) archiveStatus(ctx context.Context, args model.OtherArgs) (interface{}, error) { + key := getKey(args.Obj.GetPath(), false) + status, err := d.describeObjectStatus(ctx, key) + if err != nil { + return nil, err + } + return ArchiveResponse{ + Action: "archive_status", + Object: d.describeObject(args.Obj, key), + StorageClass: status.StorageClass, + }, nil +} + +func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error) { + key := getKey(args.Obj.GetPath(), false) + payload := ThawRequest{Days: 1} + if err := DecodeOtherArgs(args.Data, &payload); err != nil { + return nil, fmt.Errorf("parse thaw request: %w", err) + } + if payload.Days <= 0 { + payload.Days = 1 + } + restoreRequest := &s3.RestoreRequest{ + Days: aws.Int64(payload.Days), + } + if tier := NormalizeRestoreTier(payload.Tier); tier != "" { + restoreRequest.GlacierJobParameters = &s3.GlacierJobParameters{Tier: aws.String(tier)} + } + input := &s3.RestoreObjectInput{ + Bucket: &d.Bucket, + Key: &key, + RestoreRequest: restoreRequest, + } + restoreReq, _ := d.client.RestoreObjectRequest(input) + restoreReq.SetContext(ctx) + if err := restoreReq.Send(); err != nil { + return nil, err + } + status, _ := d.describeObjectStatus(ctx, key) + resp := ThawResponse{ + Action: "thaw", + Object: d.describeObject(args.Obj, key), + RequestID: restoreReq.RequestID, + } + if status != nil { + resp.Status = status.Restore + } + return resp, nil +} + +func (d *S3) thawStatus(ctx context.Context, args model.OtherArgs) (interface{}, error) { + key := getKey(args.Obj.GetPath(), false) + status, err := d.describeObjectStatus(ctx, key) + if err != nil { + return nil, err + } + return ThawResponse{ + Action: "thaw_status", + Object: d.describeObject(args.Obj, key), + Status: status.Restore, + }, nil +} + +func (d *S3) describeObject(obj model.Obj, key string) ObjectDescriptor { + return ObjectDescriptor{ + Path: obj.GetPath(), + Bucket: d.Bucket, + Key: key, + } +} + +type objectStatus struct { + StorageClass string + Restore *RestoreStatus +} + +func (d *S3) describeObjectStatus(ctx context.Context, key string) (*objectStatus, error) { + head, err := d.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{Bucket: &d.Bucket, Key: &key}) + if err != nil { + return nil, err + } + status := &objectStatus{ + StorageClass: aws.StringValue(head.StorageClass), + Restore: parseRestoreHeader(head.Restore), + } + return status, nil +} + +func parseRestoreHeader(header *string) *RestoreStatus { + if header == nil { + return nil + } + value := strings.TrimSpace(*header) + if value == "" { + return nil + } + status := &RestoreStatus{Raw: value} + parts := strings.Split(value, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + if strings.HasPrefix(part, "ongoing-request=") { + status.Ongoing = strings.Contains(part, "\"true\"") + } + if strings.HasPrefix(part, "expiry-date=") { + expiry := strings.Trim(part[len("expiry-date="):], "\"") + if expiry != "" { + if t, err := time.Parse(time.RFC1123, expiry); err == nil { + status.Expiry = t.UTC().Format(time.RFC3339) + } else { + status.Expiry = expiry + } + } + } + } + return status +} + +func DecodeOtherArgs(data interface{}, target interface{}) error { + if data == nil { + return nil + } + raw, err := json.Marshal(data) + if err != nil { + return err + } + return json.Unmarshal(raw, target) +} + +func NormalizeStorageClass(value string) string { + normalized := strings.ToLower(strings.TrimSpace(strings.ReplaceAll(value, "-", "_"))) + if normalized == "" { + return value + } + if v, ok := storageClassLookup[normalized]; ok { + return v + } + return value +} + +func NormalizeRestoreTier(value string) string { + normalized := strings.ToLower(strings.TrimSpace(value)) + switch normalized { + case "", "default": + return "" + case "bulk": + return s3.TierBulk + case "standard": + return s3.TierStandard + case "expedited": + return s3.TierExpedited + default: + return value + } +} diff --git a/drivers/s3/util.go b/drivers/s3/util.go index e02945a0..9d2b285c 100644 --- a/drivers/s3/util.go +++ b/drivers/s3/util.go @@ -109,13 +109,13 @@ func (d *S3) listV1(prefix string, args model.ListArgs) ([]model.Obj, error) { if !args.S3ShowPlaceholder && (name == getPlaceholderName(d.Placeholder) || name == d.Placeholder) { continue } - file := model.Object{ + file := &model.Object{ //Id: *object.Key, Name: name, Size: *object.Size, Modified: *object.LastModified, } - files = append(files, &file) + files = append(files, model.WrapObjStorageClass(file, aws.StringValue(object.StorageClass))) } if listObjectsResult.IsTruncated == nil { return nil, errors.New("IsTruncated nil") @@ -164,13 +164,13 @@ func (d *S3) listV2(prefix string, args model.ListArgs) ([]model.Obj, error) { if !args.S3ShowPlaceholder && (name == getPlaceholderName(d.Placeholder) || name == d.Placeholder) { continue } - file := model.Object{ + file := &model.Object{ //Id: *object.Key, Name: name, Size: *object.Size, Modified: *object.LastModified, } - files = append(files, &file) + files = append(files, model.WrapObjStorageClass(file, aws.StringValue(object.StorageClass))) } if !aws.BoolValue(listObjectsResult.IsTruncated) { break @@ -202,6 +202,9 @@ func (d *S3) copyFile(ctx context.Context, src string, dst string) error { CopySource: aws.String(url.PathEscape(d.Bucket + "/" + srcKey)), Key: &dstKey, } + if storageClass := d.resolveStorageClass(); storageClass != nil { + input.StorageClass = storageClass + } _, err := d.client.CopyObject(input) return err } diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index c67e3029..8f05b84a 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -37,6 +37,18 @@ func InitTaskManager() { if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted CleanTempDir() } + workers := conf.Conf.Tasks.S3Transition.Workers + if workers < 0 { + workers = 0 + } + fs.S3TransitionTaskManager = tache.NewManager[*fs.S3TransitionTask]( + tache.WithWorks(workers), + tache.WithPersistFunction( + db.GetTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant), + db.UpdateTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant), + ), + tache.WithMaxRetry(conf.Conf.Tasks.S3Transition.MaxRetry), + ) fs.ArchiveDownloadTaskManager = tache.NewManager[*fs.ArchiveDownloadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant), db.UpdateTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Decompress.MaxRetry)) op.RegisterSettingChangingCallback(func() { fs.ArchiveDownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers))) diff --git a/internal/conf/config.go b/internal/conf/config.go index cdb86fee..cf7cde0b 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -60,6 +60,7 @@ type TasksConfig struct { Copy TaskConfig `json:"copy" envPrefix:"COPY_"` Decompress TaskConfig `json:"decompress" envPrefix:"DECOMPRESS_"` DecompressUpload TaskConfig `json:"decompress_upload" envPrefix:"DECOMPRESS_UPLOAD_"` + S3Transition TaskConfig `json:"s3_transition" envPrefix:"S3_TRANSITION_"` AllowRetryCanceled bool `json:"allow_retry_canceled" env:"ALLOW_RETRY_CANCELED"` } @@ -184,6 +185,11 @@ func DefaultConfig() *Config { Workers: 5, MaxRetry: 2, }, + S3Transition: TaskConfig{ + Workers: 5, + MaxRetry: 2, + // TaskPersistant: true, + }, AllowRetryCanceled: false, }, Cors: Cors{ diff --git a/internal/fs/other.go b/internal/fs/other.go index 85b7b1d1..14f8f63d 100644 --- a/internal/fs/other.go +++ b/internal/fs/other.go @@ -2,10 +2,15 @@ package fs import ( "context" + "encoding/json" + stdpath "path" + "strings" + "github.com/alist-org/alist/v3/drivers/s3" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/internal/task" "github.com/pkg/errors" ) @@ -53,6 +58,38 @@ func other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) { if err != nil { return nil, errors.WithMessage(err, "failed get storage") } + originalPath := args.Path + + if _, ok := storage.(*s3.S3); ok { + method := strings.ToLower(strings.TrimSpace(args.Method)) + if method == s3.OtherMethodArchive || method == s3.OtherMethodThaw { + if S3TransitionTaskManager == nil { + return nil, errors.New("s3 transition task manager is not initialized") + } + var payload json.RawMessage + if args.Data != nil { + raw, err := json.Marshal(args.Data) + if err != nil { + return nil, errors.WithMessage(err, "failed to encode request payload") + } + payload = raw + } + taskCreator, _ := ctx.Value("user").(*model.User) + tsk := &S3TransitionTask{ + TaskExtension: task.TaskExtension{Creator: taskCreator}, + status: "queued", + StorageMountPath: storage.GetStorage().MountPath, + ObjectPath: actualPath, + DisplayPath: originalPath, + ObjectName: stdpath.Base(actualPath), + Transition: method, + Payload: payload, + } + S3TransitionTaskManager.Add(tsk) + return map[string]string{"task_id": tsk.GetID()}, nil + } + } + args.Path = actualPath return op.Other(ctx, storage, args) } diff --git a/internal/fs/s3_transition.go b/internal/fs/s3_transition.go new file mode 100644 index 00000000..395f3c5b --- /dev/null +++ b/internal/fs/s3_transition.go @@ -0,0 +1,310 @@ +package fs + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/alist-org/alist/v3/drivers/s3" + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/internal/task" + "github.com/pkg/errors" + "github.com/xhofe/tache" +) + +const s3TransitionPollInterval = 15 * time.Second + +// S3TransitionTask represents an asynchronous S3 archive/thaw request that is +// tracked via the task manager so that clients can monitor the progress of the +// operation. +type S3TransitionTask struct { + task.TaskExtension + status string + + StorageMountPath string `json:"storage_mount_path"` + ObjectPath string `json:"object_path"` + DisplayPath string `json:"display_path"` + ObjectName string `json:"object_name"` + Transition string `json:"transition"` + Payload json.RawMessage `json:"payload,omitempty"` + + TargetStorageClass string `json:"target_storage_class,omitempty"` + RequestID string `json:"request_id,omitempty"` + VersionID string `json:"version_id,omitempty"` + + storage driver.Driver `json:"-"` +} + +// S3TransitionTaskManager holds asynchronous S3 archive/thaw tasks. +var S3TransitionTaskManager *tache.Manager[*S3TransitionTask] + +var _ task.TaskExtensionInfo = (*S3TransitionTask)(nil) + +func (t *S3TransitionTask) GetName() string { + action := strings.ToLower(t.Transition) + if action == "" { + action = "transition" + } + display := t.DisplayPath + if display == "" { + display = t.ObjectPath + } + if display == "" { + display = t.ObjectName + } + return fmt.Sprintf("s3 %s %s", action, display) +} + +func (t *S3TransitionTask) GetStatus() string { + return t.status +} + +func (t *S3TransitionTask) Run() error { + t.ReinitCtx() + t.ClearEndTime() + start := time.Now() + t.SetStartTime(start) + defer func() { t.SetEndTime(time.Now()) }() + + if err := t.ensureStorage(); err != nil { + t.status = fmt.Sprintf("locate storage failed: %v", err) + return err + } + + payload, err := t.decodePayload() + if err != nil { + t.status = fmt.Sprintf("decode payload failed: %v", err) + return err + } + + method := strings.ToLower(strings.TrimSpace(t.Transition)) + switch method { + case s3.OtherMethodArchive: + t.status = "submitting archive request" + t.SetProgress(0) + resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodArchive, + Data: payload, + }) + if err != nil { + t.status = fmt.Sprintf("archive request failed: %v", err) + return err + } + archiveResp, ok := toArchiveResponse(resp) + if ok { + if t.TargetStorageClass == "" { + t.TargetStorageClass = archiveResp.StorageClass + } + t.RequestID = archiveResp.RequestID + t.VersionID = archiveResp.VersionID + if archiveResp.StorageClass != "" { + t.status = fmt.Sprintf("archive requested, waiting for %s", archiveResp.StorageClass) + } else { + t.status = "archive requested" + } + } else if sc := t.extractTargetStorageClass(); sc != "" { + t.TargetStorageClass = sc + t.status = fmt.Sprintf("archive requested, waiting for %s", sc) + } else { + t.status = "archive requested" + } + if t.TargetStorageClass != "" { + t.TargetStorageClass = s3.NormalizeStorageClass(t.TargetStorageClass) + } + t.SetProgress(25) + return t.waitForArchive() + case s3.OtherMethodThaw: + t.status = "submitting thaw request" + t.SetProgress(0) + resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodThaw, + Data: payload, + }) + if err != nil { + t.status = fmt.Sprintf("thaw request failed: %v", err) + return err + } + thawResp, ok := toThawResponse(resp) + if ok { + t.RequestID = thawResp.RequestID + if thawResp.Status != nil && !thawResp.Status.Ongoing { + t.SetProgress(100) + t.status = thawCompletionMessage(thawResp.Status) + return nil + } + } + t.status = "thaw requested" + t.SetProgress(25) + return t.waitForThaw() + default: + return errors.Errorf("unsupported transition method: %s", t.Transition) + } +} + +func (t *S3TransitionTask) ensureStorage() error { + if t.storage != nil { + return nil + } + storage, err := op.GetStorageByMountPath(t.StorageMountPath) + if err != nil { + return err + } + t.storage = storage + return nil +} + +func (t *S3TransitionTask) decodePayload() (interface{}, error) { + if len(t.Payload) == 0 { + return nil, nil + } + var payload interface{} + if err := json.Unmarshal(t.Payload, &payload); err != nil { + return nil, err + } + return payload, nil +} + +func (t *S3TransitionTask) extractTargetStorageClass() string { + if len(t.Payload) == 0 { + return "" + } + var req s3.ArchiveRequest + if err := json.Unmarshal(t.Payload, &req); err != nil { + return "" + } + return s3.NormalizeStorageClass(req.StorageClass) +} + +func (t *S3TransitionTask) waitForArchive() error { + ticker := time.NewTicker(s3TransitionPollInterval) + defer ticker.Stop() + + ctx := t.Ctx() + for { + select { + case <-ctx.Done(): + t.status = "archive canceled" + return ctx.Err() + case <-ticker.C: + resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodArchiveStatus, + }) + if err != nil { + t.status = fmt.Sprintf("archive status error: %v", err) + return err + } + archiveResp, ok := toArchiveResponse(resp) + if !ok { + t.status = fmt.Sprintf("unexpected archive status response: %T", resp) + return errors.Errorf("unexpected archive status response: %T", resp) + } + currentClass := strings.TrimSpace(archiveResp.StorageClass) + target := strings.TrimSpace(t.TargetStorageClass) + if target == "" { + target = currentClass + t.TargetStorageClass = currentClass + } + if currentClass == "" { + t.status = "waiting for storage class update" + t.SetProgress(50) + continue + } + if strings.EqualFold(currentClass, target) { + t.SetProgress(100) + t.status = fmt.Sprintf("archive complete (%s)", currentClass) + return nil + } + t.status = fmt.Sprintf("storage class %s (target %s)", currentClass, target) + t.SetProgress(75) + } + } +} + +func (t *S3TransitionTask) waitForThaw() error { + ticker := time.NewTicker(s3TransitionPollInterval) + defer ticker.Stop() + + ctx := t.Ctx() + for { + select { + case <-ctx.Done(): + t.status = "thaw canceled" + return ctx.Err() + case <-ticker.C: + resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodThawStatus, + }) + if err != nil { + t.status = fmt.Sprintf("thaw status error: %v", err) + return err + } + thawResp, ok := toThawResponse(resp) + if !ok { + t.status = fmt.Sprintf("unexpected thaw status response: %T", resp) + return errors.Errorf("unexpected thaw status response: %T", resp) + } + status := thawResp.Status + if status == nil { + t.status = "waiting for thaw status" + t.SetProgress(50) + continue + } + if status.Ongoing { + t.status = fmt.Sprintf("thaw in progress (%s)", status.Raw) + t.SetProgress(75) + continue + } + t.SetProgress(100) + t.status = thawCompletionMessage(status) + return nil + } + } +} + +func thawCompletionMessage(status *s3.RestoreStatus) string { + if status == nil { + return "thaw complete" + } + if status.Expiry != "" { + return fmt.Sprintf("thaw complete, expires %s", status.Expiry) + } + return "thaw complete" +} + +func toArchiveResponse(v interface{}) (s3.ArchiveResponse, bool) { + switch resp := v.(type) { + case s3.ArchiveResponse: + return resp, true + case *s3.ArchiveResponse: + if resp != nil { + return *resp, true + } + } + return s3.ArchiveResponse{}, false +} + +func toThawResponse(v interface{}) (s3.ThawResponse, bool) { + switch resp := v.(type) { + case s3.ThawResponse: + return resp, true + case *s3.ThawResponse: + if resp != nil { + return *resp, true + } + } + return s3.ThawResponse{}, false +} + +// Ensure compatibility with persistence when tasks are restored. +func (t *S3TransitionTask) OnRestore() { + // The storage handle is not persisted intentionally; it will be lazily + // re-fetched on the next Run invocation. + t.storage = nil +} diff --git a/internal/model/obj.go b/internal/model/obj.go index 93fa7a96..ed4e0451 100644 --- a/internal/model/obj.go +++ b/internal/model/obj.go @@ -20,6 +20,10 @@ type ObjUnwrap interface { Unwrap() Obj } +type StorageClassProvider interface { + StorageClass() string +} + type Obj interface { GetSize() int64 GetName() string @@ -141,6 +145,13 @@ func WrapObjsName(objs []Obj) { } } +func WrapObjStorageClass(obj Obj, storageClass string) Obj { + if storageClass == "" { + return obj + } + return &ObjWrapStorageClass{Obj: obj, storageClass: storageClass} +} + func UnwrapObj(obj Obj) Obj { if unwrap, ok := obj.(ObjUnwrap); ok { obj = unwrap.Unwrap() @@ -168,6 +179,20 @@ func GetUrl(obj Obj) (url string, ok bool) { return url, false } +func GetStorageClass(obj Obj) (string, bool) { + if provider, ok := obj.(StorageClassProvider); ok { + value := provider.StorageClass() + if value == "" { + return "", false + } + return value, true + } + if unwrap, ok := obj.(ObjUnwrap); ok { + return GetStorageClass(unwrap.Unwrap()) + } + return "", false +} + func GetRawObject(obj Obj) *Object { switch v := obj.(type) { case *ObjThumbURL: diff --git a/internal/model/object.go b/internal/model/object.go index c8c10bb9..1617662c 100644 --- a/internal/model/object.go +++ b/internal/model/object.go @@ -11,6 +11,11 @@ type ObjWrapName struct { Obj } +type ObjWrapStorageClass struct { + storageClass string + Obj +} + func (o *ObjWrapName) Unwrap() Obj { return o.Obj } @@ -19,6 +24,20 @@ func (o *ObjWrapName) GetName() string { return o.Name } +func (o *ObjWrapStorageClass) Unwrap() Obj { + return o.Obj +} + +func (o *ObjWrapStorageClass) StorageClass() string { + return o.storageClass +} + +func (o *ObjWrapStorageClass) SetPath(path string) { + if setter, ok := o.Obj.(SetPath); ok { + setter.SetPath(path) + } +} + type Object struct { ID string Path string diff --git a/server/handles/archive.go b/server/handles/archive.go index 0bb8d94a..5787897c 100644 --- a/server/handles/archive.go +++ b/server/handles/archive.go @@ -44,17 +44,19 @@ type ArchiveContentResp struct { } func toObjsRespWithoutSignAndThumb(obj model.Obj) ObjResp { + storageClass, _ := model.GetStorageClass(obj) return ObjResp{ - Name: obj.GetName(), - Size: obj.GetSize(), - IsDir: obj.IsDir(), - Modified: obj.ModTime(), - Created: obj.CreateTime(), - HashInfoStr: obj.GetHash().String(), - HashInfo: obj.GetHash().Export(), - Sign: "", - Thumb: "", - Type: utils.GetObjType(obj.GetName(), obj.IsDir()), + Name: obj.GetName(), + Size: obj.GetSize(), + IsDir: obj.IsDir(), + Modified: obj.ModTime(), + Created: obj.CreateTime(), + HashInfoStr: obj.GetHash().String(), + HashInfo: obj.GetHash().Export(), + Sign: "", + Thumb: "", + Type: utils.GetObjType(obj.GetName(), obj.IsDir()), + StorageClass: storageClass, } } diff --git a/server/handles/fsread.go b/server/handles/fsread.go index 8cf3c9b0..676d64b1 100644 --- a/server/handles/fsread.go +++ b/server/handles/fsread.go @@ -33,18 +33,19 @@ type DirReq struct { } type ObjResp struct { - Id string `json:"id"` - Path string `json:"path"` - Name string `json:"name"` - Size int64 `json:"size"` - IsDir bool `json:"is_dir"` - Modified time.Time `json:"modified"` - Created time.Time `json:"created"` - Sign string `json:"sign"` - Thumb string `json:"thumb"` - Type int `json:"type"` - HashInfoStr string `json:"hashinfo"` - HashInfo map[*utils.HashType]string `json:"hash_info"` + Id string `json:"id"` + Path string `json:"path"` + Name string `json:"name"` + Size int64 `json:"size"` + IsDir bool `json:"is_dir"` + Modified time.Time `json:"modified"` + Created time.Time `json:"created"` + Sign string `json:"sign"` + Thumb string `json:"thumb"` + Type int `json:"type"` + HashInfoStr string `json:"hashinfo"` + HashInfo map[*utils.HashType]string `json:"hash_info"` + StorageClass string `json:"storage_class,omitempty"` } type FsListResp struct { @@ -57,19 +58,20 @@ type FsListResp struct { } type ObjLabelResp struct { - Id string `json:"id"` - Path string `json:"path"` - Name string `json:"name"` - Size int64 `json:"size"` - IsDir bool `json:"is_dir"` - Modified time.Time `json:"modified"` - Created time.Time `json:"created"` - Sign string `json:"sign"` - Thumb string `json:"thumb"` - Type int `json:"type"` - HashInfoStr string `json:"hashinfo"` - HashInfo map[*utils.HashType]string `json:"hash_info"` - LabelList []model.Label `json:"label_list"` + Id string `json:"id"` + Path string `json:"path"` + Name string `json:"name"` + Size int64 `json:"size"` + IsDir bool `json:"is_dir"` + Modified time.Time `json:"modified"` + Created time.Time `json:"created"` + Sign string `json:"sign"` + Thumb string `json:"thumb"` + Type int `json:"type"` + HashInfoStr string `json:"hashinfo"` + HashInfo map[*utils.HashType]string `json:"hash_info"` + LabelList []model.Label `json:"label_list"` + StorageClass string `json:"storage_class,omitempty"` } func FsList(c *gin.Context) { @@ -256,20 +258,22 @@ func toObjsResp(objs []model.Obj, parent string, encrypt bool) []ObjLabelResp { labels = labelsByName[obj.GetName()] } thumb, _ := model.GetThumb(obj) + storageClass, _ := model.GetStorageClass(obj) resp = append(resp, ObjLabelResp{ - Id: obj.GetID(), - Path: obj.GetPath(), - Name: obj.GetName(), - Size: obj.GetSize(), - IsDir: obj.IsDir(), - Modified: obj.ModTime(), - Created: obj.CreateTime(), - HashInfoStr: obj.GetHash().String(), - HashInfo: obj.GetHash().Export(), - Sign: common.Sign(obj, parent, encrypt), - Thumb: thumb, - Type: utils.GetObjType(obj.GetName(), obj.IsDir()), - LabelList: labels, + Id: obj.GetID(), + Path: obj.GetPath(), + Name: obj.GetName(), + Size: obj.GetSize(), + IsDir: obj.IsDir(), + Modified: obj.ModTime(), + Created: obj.CreateTime(), + HashInfoStr: obj.GetHash().String(), + HashInfo: obj.GetHash().Export(), + Sign: common.Sign(obj, parent, encrypt), + Thumb: thumb, + Type: utils.GetObjType(obj.GetName(), obj.IsDir()), + LabelList: labels, + StorageClass: storageClass, }) } return resp @@ -374,20 +378,22 @@ func FsGet(c *gin.Context) { } parentMeta, _ := op.GetNearestMeta(parentPath) thumb, _ := model.GetThumb(obj) + storageClass, _ := model.GetStorageClass(obj) common.SuccessResp(c, FsGetResp{ ObjResp: ObjResp{ - Id: obj.GetID(), - Path: obj.GetPath(), - Name: obj.GetName(), - Size: obj.GetSize(), - IsDir: obj.IsDir(), - Modified: obj.ModTime(), - Created: obj.CreateTime(), - HashInfoStr: obj.GetHash().String(), - HashInfo: obj.GetHash().Export(), - Sign: common.Sign(obj, parentPath, isEncrypt(meta, reqPath)), - Type: utils.GetFileType(obj.GetName()), - Thumb: thumb, + Id: obj.GetID(), + Path: obj.GetPath(), + Name: obj.GetName(), + Size: obj.GetSize(), + IsDir: obj.IsDir(), + Modified: obj.ModTime(), + Created: obj.CreateTime(), + HashInfoStr: obj.GetHash().String(), + HashInfo: obj.GetHash().Export(), + Sign: common.Sign(obj, parentPath, isEncrypt(meta, reqPath)), + Type: utils.GetFileType(obj.GetName()), + Thumb: thumb, + StorageClass: storageClass, }, RawURL: rawURL, Readme: getReadme(meta, reqPath), diff --git a/server/handles/task.go b/server/handles/task.go index 6d49f9e5..a4dbce0f 100644 --- a/server/handles/task.go +++ b/server/handles/task.go @@ -220,6 +220,7 @@ func SetupTaskRoute(g *gin.RouterGroup) { taskRoute(g.Group("/copy"), fs.CopyTaskManager) taskRoute(g.Group("/offline_download"), tool.DownloadTaskManager) taskRoute(g.Group("/offline_download_transfer"), tool.TransferTaskManager) + taskRoute(g.Group("/s3_transition"), fs.S3TransitionTaskManager) taskRoute(g.Group("/decompress"), fs.ArchiveDownloadTaskManager) taskRoute(g.Group("/decompress_upload"), fs.ArchiveContentUploadTaskManager) }