mirror of
https://github.com/sun-guannan/CapCutAPI.git
synced 2025-11-25 03:15:00 +08:00
add create_draft api
This commit is contained in:
@@ -5,43 +5,43 @@ from draft_cache import DRAFT_CACHE, update_cache
|
|||||||
|
|
||||||
def create_draft(width=1080, height=1920):
|
def create_draft(width=1080, height=1920):
|
||||||
"""
|
"""
|
||||||
创建新的剪映草稿
|
Create new CapCut draft
|
||||||
:param width: 视频宽度,默认1080
|
:param width: Video width, default 1080
|
||||||
:param height: 视频高度,默认1920
|
:param height: Video height, default 1920
|
||||||
:return: (draft_name, draft_path, draft_id, draft_url)
|
:return: (draft_name, draft_path, draft_id, draft_url)
|
||||||
"""
|
"""
|
||||||
# 生成时间戳和draft_id
|
# Generate timestamp and draft_id
|
||||||
unix_time = int(time.time())
|
unix_time = int(time.time())
|
||||||
unique_id = uuid.uuid4().hex[:8] # 取UUID的前8位即可
|
unique_id = uuid.uuid4().hex[:8] # Take the first 8 digits of UUID
|
||||||
draft_id = f"dfd_cat_{unix_time}_{unique_id}" # 使用Unix时间戳和UUID组合
|
draft_id = f"dfd_cat_{unix_time}_{unique_id}" # Use Unix timestamp and UUID combination
|
||||||
|
|
||||||
# 创建指定分辨率的剪映草稿
|
# Create CapCut draft with specified resolution
|
||||||
script = draft.Script_file(width, height)
|
script = draft.Script_file(width, height)
|
||||||
|
|
||||||
# 存入全局缓存
|
# Store in global cache
|
||||||
update_cache(draft_id, script)
|
update_cache(draft_id, script)
|
||||||
|
|
||||||
return script, draft_id
|
return script, draft_id
|
||||||
|
|
||||||
def get_or_create_draft(draft_id=None, width=1080, height=1920):
|
def get_or_create_draft(draft_id=None, width=1080, height=1920):
|
||||||
"""
|
"""
|
||||||
获取或创建剪映草稿
|
Get or create CapCut draft
|
||||||
:param draft_id: 草稿ID,如果为None或找不到对应的zip文件,则创建新草稿
|
:param draft_id: Draft ID, if None or corresponding zip file not found, create new draft
|
||||||
:param width: 视频宽度,默认1080
|
:param width: Video width, default 1080
|
||||||
:param height: 视频高度,默认1920
|
:param height: Video height, default 1920
|
||||||
:return: (draft_name, draft_path, draft_id, draft_dir, script)
|
:return: (draft_name, draft_path, draft_id, draft_dir, script)
|
||||||
"""
|
"""
|
||||||
global DRAFT_CACHE # 声明使用全局变量
|
global DRAFT_CACHE # Declare use of global variable
|
||||||
|
|
||||||
if draft_id is not None and draft_id in DRAFT_CACHE:
|
if draft_id is not None and draft_id in DRAFT_CACHE:
|
||||||
# 从缓存中获取已存在的草稿信息
|
# Get existing draft information from cache
|
||||||
print(f"从缓存中获取草稿: {draft_id}")
|
print(f"Getting draft from cache: {draft_id}")
|
||||||
# 更新最近访问时间
|
# Update last access time
|
||||||
update_cache(draft_id, DRAFT_CACHE[draft_id])
|
update_cache(draft_id, DRAFT_CACHE[draft_id])
|
||||||
return draft_id, DRAFT_CACHE[draft_id]
|
return draft_id, DRAFT_CACHE[draft_id]
|
||||||
|
|
||||||
# 创建新草稿逻辑
|
# Create new draft logic
|
||||||
print("创建新草稿")
|
print("Creating new draft")
|
||||||
script, generate_draft_id = create_draft(
|
script, generate_draft_id = create_draft(
|
||||||
width=width,
|
width=width,
|
||||||
height=height,
|
height=height,
|
||||||
|
|||||||
@@ -7,123 +7,123 @@ from urllib.parse import urlparse, unquote
|
|||||||
|
|
||||||
def download_video(video_url, draft_name, material_name):
|
def download_video(video_url, draft_name, material_name):
|
||||||
"""
|
"""
|
||||||
下载视频到指定目录
|
Download video to specified directory
|
||||||
:param video_url: 视频URL
|
:param video_url: Video URL
|
||||||
:param draft_name: 草稿名称
|
:param draft_name: Draft name
|
||||||
:param material_name: 素材名称
|
:param material_name: Material name
|
||||||
:return: 本地视频路径
|
:return: Local video path
|
||||||
"""
|
"""
|
||||||
# 确保目录存在
|
# Ensure directory exists
|
||||||
video_dir = f"{draft_name}/assets/video"
|
video_dir = f"{draft_name}/assets/video"
|
||||||
os.makedirs(video_dir, exist_ok=True)
|
os.makedirs(video_dir, exist_ok=True)
|
||||||
|
|
||||||
# 生成本地文件名
|
# Generate local filename
|
||||||
local_path = f"{video_dir}/{material_name}"
|
local_path = f"{video_dir}/{material_name}"
|
||||||
|
|
||||||
# 检查文件是否已存在
|
# Check if file already exists
|
||||||
if os.path.exists(local_path):
|
if os.path.exists(local_path):
|
||||||
print(f"视频文件已存在: {local_path}")
|
print(f"Video file already exists: {local_path}")
|
||||||
return local_path
|
return local_path
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 使用ffmpeg下载视频
|
# Use ffmpeg to download video
|
||||||
command = [
|
command = [
|
||||||
'ffmpeg',
|
'ffmpeg',
|
||||||
'-i', video_url,
|
'-i', video_url,
|
||||||
'-c', 'copy', # 直接复制,不重新编码
|
'-c', 'copy', # Direct copy, no re-encoding
|
||||||
local_path
|
local_path
|
||||||
]
|
]
|
||||||
subprocess.run(command, check=True, capture_output=True)
|
subprocess.run(command, check=True, capture_output=True)
|
||||||
return local_path
|
return local_path
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
raise Exception(f"下载视频失败: {e.stderr.decode('utf-8')}")
|
raise Exception(f"Failed to download video: {e.stderr.decode('utf-8')}")
|
||||||
|
|
||||||
def download_image(image_url, draft_name, material_name):
|
def download_image(image_url, draft_name, material_name):
|
||||||
"""
|
"""
|
||||||
下载图片到指定目录,并统一转换为PNG格式
|
Download image to specified directory, and convert to PNG format
|
||||||
:param image_url: 图片URL
|
:param image_url: Image URL
|
||||||
:param draft_name: 草稿名称
|
:param draft_name: Draft name
|
||||||
:param material_name: 素材名称
|
:param material_name: Material name
|
||||||
:return: 本地图片路径
|
:return: Local image path
|
||||||
"""
|
"""
|
||||||
# 确保目录存在
|
# Ensure directory exists
|
||||||
image_dir = f"{draft_name}/assets/image"
|
image_dir = f"{draft_name}/assets/image"
|
||||||
os.makedirs(image_dir, exist_ok=True)
|
os.makedirs(image_dir, exist_ok=True)
|
||||||
|
|
||||||
# 统一使用png格式
|
# Uniformly use png format
|
||||||
local_path = f"{image_dir}/{material_name}"
|
local_path = f"{image_dir}/{material_name}"
|
||||||
|
|
||||||
# 检查文件是否已存在
|
# Check if file already exists
|
||||||
if os.path.exists(local_path):
|
if os.path.exists(local_path):
|
||||||
print(f"图片文件已存在: {local_path}")
|
print(f"Image file already exists: {local_path}")
|
||||||
return local_path
|
return local_path
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 使用ffmpeg下载并转换图片为PNG格式
|
# Use ffmpeg to download and convert image to PNG format
|
||||||
command = [
|
command = [
|
||||||
'ffmpeg',
|
'ffmpeg',
|
||||||
'-i', image_url,
|
'-i', image_url,
|
||||||
'-vf', 'format=rgba', # 转换为RGBA格式以支持透明度
|
'-vf', 'format=rgba', # Convert to RGBA format to support transparency
|
||||||
'-frames:v', '1', # 确保只处理一帧
|
'-frames:v', '1', # Ensure only one frame is processed
|
||||||
'-y', # 覆盖已存在的文件
|
'-y', # Overwrite existing files
|
||||||
local_path
|
local_path
|
||||||
]
|
]
|
||||||
subprocess.run(command, check=True, capture_output=True)
|
subprocess.run(command, check=True, capture_output=True)
|
||||||
return local_path
|
return local_path
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
raise Exception(f"下载图片失败: {e.stderr.decode('utf-8')}")
|
raise Exception(f"Failed to download image: {e.stderr.decode('utf-8')}")
|
||||||
|
|
||||||
def download_audio(audio_url, draft_name, material_name):
|
def download_audio(audio_url, draft_name, material_name):
|
||||||
"""
|
"""
|
||||||
下载音频并转码为MP3格式到指定目录
|
Download audio and transcode to MP3 format to specified directory
|
||||||
:param audio_url: 音频URL
|
:param audio_url: Audio URL
|
||||||
:param draft_name: 草稿名称
|
:param draft_name: Draft name
|
||||||
:param material_name: 素材名称
|
:param material_name: Material name
|
||||||
:return: 本地音频路径
|
:return: Local audio path
|
||||||
"""
|
"""
|
||||||
# 确保目录存在
|
# Ensure directory exists
|
||||||
audio_dir = f"{draft_name}/assets/audio"
|
audio_dir = f"{draft_name}/assets/audio"
|
||||||
os.makedirs(audio_dir, exist_ok=True)
|
os.makedirs(audio_dir, exist_ok=True)
|
||||||
|
|
||||||
# 生成本地文件名(保留.mp3后缀)
|
# Generate local filename (keep .mp3 extension)
|
||||||
local_path = f"{audio_dir}/{material_name}"
|
local_path = f"{audio_dir}/{material_name}"
|
||||||
|
|
||||||
# 检查文件是否已存在
|
# Check if file already exists
|
||||||
if os.path.exists(local_path):
|
if os.path.exists(local_path):
|
||||||
print(f"音频文件已存在: {local_path}")
|
print(f"Audio file already exists: {local_path}")
|
||||||
return local_path
|
return local_path
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 使用ffmpeg下载并转码为MP3(关键修改:指定MP3编码器)
|
# Use ffmpeg to download and transcode to MP3 (key modification: specify MP3 encoder)
|
||||||
command = [
|
command = [
|
||||||
'ffmpeg',
|
'ffmpeg',
|
||||||
'-i', audio_url, # 输入URL
|
'-i', audio_url, # Input URL
|
||||||
'-c:a', 'libmp3lame', # 强制将音频流编码为MP3
|
'-c:a', 'libmp3lame', # Force encode audio stream to MP3
|
||||||
'-q:a', '2', # 设置音频质量(0-9,0为最佳,2为平衡质量与文件大小)
|
'-q:a', '2', # Set audio quality (0-9, 0 is best, 2 balances quality and file size)
|
||||||
'-y', # 覆盖已存在文件(可选)
|
'-y', # Overwrite existing files (optional)
|
||||||
local_path # 输出路径
|
local_path # Output path
|
||||||
]
|
]
|
||||||
subprocess.run(command, check=True, capture_output=True, text=True)
|
subprocess.run(command, check=True, capture_output=True, text=True)
|
||||||
return local_path
|
return local_path
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
raise Exception(f"下载音频失败:\n{e.stderr}")
|
raise Exception(f"Failed to download audio:\n{e.stderr}")
|
||||||
|
|
||||||
def download_file(url:str, local_filename, max_retries=3, timeout=180):
|
def download_file(url:str, local_filename, max_retries=3, timeout=180):
|
||||||
# 提取目录部分
|
# Extract directory part
|
||||||
directory = os.path.dirname(local_filename)
|
directory = os.path.dirname(local_filename)
|
||||||
|
|
||||||
retries = 0
|
retries = 0
|
||||||
while retries < max_retries:
|
while retries < max_retries:
|
||||||
try:
|
try:
|
||||||
if retries > 0:
|
if retries > 0:
|
||||||
wait_time = 2 ** retries # 指数退避策略
|
wait_time = 2 ** retries # Exponential backoff strategy
|
||||||
print(f"Retrying in {wait_time} seconds... (Attempt {retries+1}/{max_retries})")
|
print(f"Retrying in {wait_time} seconds... (Attempt {retries+1}/{max_retries})")
|
||||||
time.sleep(wait_time)
|
time.sleep(wait_time)
|
||||||
|
|
||||||
print(f"Downloading file: {local_filename}")
|
print(f"Downloading file: {local_filename}")
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
# 创建目录(如果不存在)
|
# Create directory (if it doesn't exist)
|
||||||
if directory and not os.path.exists(directory):
|
if directory and not os.path.exists(directory):
|
||||||
os.makedirs(directory, exist_ok=True)
|
os.makedirs(directory, exist_ok=True)
|
||||||
print(f"Created directory: {directory}")
|
print(f"Created directory: {directory}")
|
||||||
@@ -143,13 +143,13 @@ def download_file(url:str, local_filename, max_retries=3, timeout=180):
|
|||||||
|
|
||||||
if total_size > 0:
|
if total_size > 0:
|
||||||
progress = bytes_written / total_size * 100
|
progress = bytes_written / total_size * 100
|
||||||
# 对于频繁更新的进度,可以考虑使用logger.debug或更细粒度的控制,避免日志文件过大
|
# For frequently updated progress, consider using logger.debug or more granular control to avoid large log files
|
||||||
# 或者只在控制台输出进度,不写入文件
|
# Or only output progress to console, not write to file
|
||||||
print(f"\r[PROGRESS] {progress:.2f}% ({bytes_written/1024:.2f}KB/{total_size/1024:.2f}KB)", end='')
|
print(f"\r[PROGRESS] {progress:.2f}% ({bytes_written/1024:.2f}KB/{total_size/1024:.2f}KB)", end='')
|
||||||
pass # 避免在日志文件中打印过多进度信息
|
pass # Avoid printing too much progress information in log files
|
||||||
|
|
||||||
if total_size > 0:
|
if total_size > 0:
|
||||||
# print() # 原始的换行符
|
# print() # Original newline
|
||||||
pass
|
pass
|
||||||
print(f"Download completed in {time.time()-start_time:.2f} seconds")
|
print(f"Download completed in {time.time()-start_time:.2f} seconds")
|
||||||
print(f"File saved as: {os.path.abspath(local_filename)}")
|
print(f"File saved as: {os.path.abspath(local_filename)}")
|
||||||
|
|||||||
@@ -2,18 +2,18 @@ from collections import OrderedDict
|
|||||||
import pyJianYingDraft as draft
|
import pyJianYingDraft as draft
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
|
||||||
# 修改全局变量,使用OrderedDict实现LRU缓存,限制最大数量为10000
|
# Modify global variable, use OrderedDict to implement LRU cache, limit the maximum number to 10000
|
||||||
DRAFT_CACHE: Dict[str, 'draft.Script_file'] = OrderedDict() # 使用 Dict 进行类型提示
|
DRAFT_CACHE: Dict[str, 'draft.Script_file'] = OrderedDict() # Use Dict for type hinting
|
||||||
MAX_CACHE_SIZE = 10000
|
MAX_CACHE_SIZE = 10000
|
||||||
|
|
||||||
def update_cache(key: str, value: draft.Script_file) -> None:
|
def update_cache(key: str, value: draft.Script_file) -> None:
|
||||||
"""更新LRU缓存"""
|
"""Update LRU cache"""
|
||||||
if key in DRAFT_CACHE:
|
if key in DRAFT_CACHE:
|
||||||
# 如果键存在,删除旧的项
|
# If the key exists, delete the old item
|
||||||
DRAFT_CACHE.pop(key)
|
DRAFT_CACHE.pop(key)
|
||||||
elif len(DRAFT_CACHE) >= MAX_CACHE_SIZE:
|
elif len(DRAFT_CACHE) >= MAX_CACHE_SIZE:
|
||||||
print(f"{key}, 缓存已满,删除最久未使用的项")
|
print(f"{key}, Cache is full, deleting the least recently used item")
|
||||||
# 如果缓存已满,删除最久未使用的项(第一个项)
|
# If the cache is full, delete the least recently used item (the first item)
|
||||||
DRAFT_CACHE.popitem(last=False)
|
DRAFT_CACHE.popitem(last=False)
|
||||||
# 添加新项到末尾(最近使用)
|
# Add new item to the end (most recently used)
|
||||||
DRAFT_CACHE[key] = value
|
DRAFT_CACHE[key] = value
|
||||||
Reference in New Issue
Block a user