mirror of
https://github.com/sun-guannan/CapCutAPI.git
synced 2025-11-24 19:13:01 +08:00
142 lines
4.9 KiB
Python
142 lines
4.9 KiB
Python
from collections import OrderedDict
|
|
import threading
|
|
from typing import Dict, Any
|
|
|
|
# Using OrderedDict to implement LRU cache, limiting the maximum number to 1000
|
|
DRAFT_TASKS: Dict[str, dict] = OrderedDict() # Using Dict for type hinting
|
|
MAX_TASKS_CACHE_SIZE = 1000
|
|
|
|
|
|
def update_tasks_cache(task_id: str, task_status: dict) -> None:
|
|
"""Update task status LRU cache
|
|
|
|
:param task_id: Task ID
|
|
:param task_status: Task status information dictionary
|
|
"""
|
|
|
|
if task_id in DRAFT_TASKS:
|
|
# If the key exists, delete the old item
|
|
DRAFT_TASKS.pop(task_id)
|
|
elif len(DRAFT_TASKS) >= MAX_TASKS_CACHE_SIZE:
|
|
# If the cache is full, delete the least recently used item (the first item)
|
|
DRAFT_TASKS.popitem(last=False)
|
|
# Add new item to the end (most recently used)
|
|
DRAFT_TASKS[task_id] = task_status
|
|
|
|
def update_task_field(task_id: str, field: str, value: Any) -> None:
|
|
"""Update a single field in the task status
|
|
|
|
:param task_id: Task ID
|
|
:param field: Field name to update
|
|
:param value: New value for the field
|
|
"""
|
|
if task_id in DRAFT_TASKS:
|
|
# Copy the current status, modify the specified field, then update the cache
|
|
task_status = DRAFT_TASKS[task_id].copy()
|
|
task_status[field] = value
|
|
# Delete the old item and add the updated item
|
|
DRAFT_TASKS.pop(task_id)
|
|
DRAFT_TASKS[task_id] = task_status
|
|
else:
|
|
# If the task doesn't exist, create a default status and set the specified field
|
|
task_status = {
|
|
"status": "initialized",
|
|
"message": "Task initialized",
|
|
"progress": 0,
|
|
"completed_files": 0,
|
|
"total_files": 0,
|
|
"draft_url": ""
|
|
}
|
|
task_status[field] = value
|
|
# If the cache is full, delete the least recently used item
|
|
if len(DRAFT_TASKS) >= MAX_TASKS_CACHE_SIZE:
|
|
DRAFT_TASKS.popitem(last=False)
|
|
# Add new item
|
|
DRAFT_TASKS[task_id] = task_status
|
|
|
|
def update_task_fields(task_id: str, **fields) -> None:
|
|
"""Update multiple fields in the task status
|
|
|
|
:param task_id: Task ID
|
|
:param fields: Fields to update and their values, provided as keyword arguments
|
|
"""
|
|
if task_id in DRAFT_TASKS:
|
|
# Copy the current status, modify the specified fields, then update the cache
|
|
task_status = DRAFT_TASKS[task_id].copy()
|
|
for field, value in fields.items():
|
|
task_status[field] = value
|
|
# Delete the old item and add the updated item
|
|
DRAFT_TASKS.pop(task_id)
|
|
DRAFT_TASKS[task_id] = task_status
|
|
else:
|
|
# If the task doesn't exist, create a default status and set the specified fields
|
|
task_status = {
|
|
"status": "initialized",
|
|
"message": "Task initialized",
|
|
"progress": 0,
|
|
"completed_files": 0,
|
|
"total_files": 0,
|
|
"draft_url": ""
|
|
}
|
|
for field, value in fields.items():
|
|
task_status[field] = value
|
|
# If the cache is full, delete the least recently used item
|
|
if len(DRAFT_TASKS) >= MAX_TASKS_CACHE_SIZE:
|
|
DRAFT_TASKS.popitem(last=False)
|
|
# Add new item
|
|
DRAFT_TASKS[task_id] = task_status
|
|
|
|
def increment_task_field(task_id: str, field: str, increment: int = 1) -> None:
|
|
"""Increment a numeric field in the task status
|
|
|
|
:param task_id: Task ID
|
|
:param field: Field name to increment
|
|
:param increment: Value to increment by, default is 1
|
|
"""
|
|
if task_id in DRAFT_TASKS:
|
|
# Copy the current status, increment the specified field, then update the cache
|
|
task_status = DRAFT_TASKS[task_id].copy()
|
|
if field in task_status and isinstance(task_status[field], (int, float)):
|
|
task_status[field] += increment
|
|
else:
|
|
task_status[field] = increment
|
|
# Delete the old item and add the updated item
|
|
DRAFT_TASKS.pop(task_id)
|
|
DRAFT_TASKS[task_id] = task_status
|
|
|
|
def get_task_status(task_id: str) -> dict:
|
|
"""Get task status
|
|
|
|
:param task_id: Task ID
|
|
:return: Task status information dictionary
|
|
"""
|
|
task_status = DRAFT_TASKS.get(task_id, {
|
|
"status": "not_found",
|
|
"message": "Task does not exist",
|
|
"progress": 0,
|
|
"completed_files": 0,
|
|
"total_files": 0,
|
|
"draft_url": ""
|
|
})
|
|
|
|
# If the task is found, update its position in the LRU cache
|
|
if task_id in DRAFT_TASKS:
|
|
# First delete, then add to the end, implementing LRU update
|
|
update_tasks_cache(task_id, task_status)
|
|
|
|
return task_status
|
|
|
|
def create_task(task_id: str) -> None:
|
|
"""Create a new task and initialize its status
|
|
|
|
:param task_id: Task ID
|
|
"""
|
|
task_status = {
|
|
"status": "initialized",
|
|
"message": "Task initialized",
|
|
"progress": 0,
|
|
"completed_files": 0,
|
|
"total_files": 0,
|
|
"draft_url": ""
|
|
}
|
|
update_tasks_cache(task_id, task_status) |