🎨 style: Format Code

This commit is contained in:
yyhhyyyyyy
2024-12-10 10:34:56 +08:00
parent 809d6cabbb
commit afd064e15d
17 changed files with 123 additions and 95 deletions

View File

@@ -4,10 +4,10 @@ import os
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from loguru import logger
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from loguru import logger
from app.config import config
from app.models.exception import HttpException

View File

@@ -1,7 +1,8 @@
import os
import socket
import toml
import shutil
import socket
import toml
from loguru import logger
root_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
@@ -17,7 +18,7 @@ def load_config():
example_file = f"{root_dir}/config.example.toml"
if os.path.isfile(example_file):
shutil.copyfile(example_file, config_file)
logger.info(f"copy config.example.toml to config.toml")
logger.info("copy config.example.toml to config.toml")
logger.info(f"load config from file: {config_file}")

View File

@@ -1,5 +1,5 @@
import threading
from typing import Callable, Any, Dict
from typing import Any, Callable, Dict
class TaskManager:
@@ -33,7 +33,7 @@ class TaskManager:
try:
with self.lock:
self.current_tasks += 1
func(*args, **kwargs) # 在这里调用函数,传递*args和**kwargs
func(*args, **kwargs) # call the function here, passing *args and **kwargs.
finally:
self.task_done()

View File

@@ -1,5 +1,4 @@
from fastapi import APIRouter
from fastapi import Request
from fastapi import APIRouter, Request
router = APIRouter()

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends
from fastapi import APIRouter
def new_router(dependencies=None):

View File

@@ -1,15 +1,16 @@
from fastapi import Request
from app.controllers.v1.base import new_router
from app.models.schema import (
VideoScriptResponse,
VideoScriptRequest,
VideoTermsResponse,
VideoScriptResponse,
VideoTermsRequest,
VideoTermsResponse,
)
from app.services import llm
from app.utils import utils
# 认证依赖项
# authentication dependency
# router = new_router(dependencies=[Depends(base.verify_token)])
router = new_router()

View File

@@ -11,7 +11,7 @@ class HttpException(Exception):
self.message = message
self.status_code = status_code
self.data = data
# 获取异常堆栈信息
# Retrieve the exception stack trace information.
tb_str = traceback.format_exc().strip()
if not tb_str or tb_str == "NoneType: None":
msg = f"HttpException: {status_code}, {task_id}, {message}"

View File

@@ -98,15 +98,15 @@ class VideoParams(BaseModel):
"""
video_subject: str
video_script: str = "" # 用于生成视频的脚本
video_terms: Optional[str | list] = None # 用于生成视频的关键词
video_script: str = "" # Script used to generate the video
video_terms: Optional[str | list] = None # Keywords used to generate the video
video_aspect: Optional[VideoAspect] = VideoAspect.portrait.value
video_concat_mode: Optional[VideoConcatMode] = VideoConcatMode.random.value
video_clip_duration: Optional[int] = 5
video_count: Optional[int] = 1
video_source: Optional[str] = "pexels"
video_materials: Optional[List[MaterialInfo]] = None # 用于生成视频的素材
video_materials: Optional[List[MaterialInfo]] = None # Materials used to generate the video
video_language: Optional[str] = "" # auto detect

View File

@@ -1,10 +1,10 @@
import json
import logging
import re
import json
from typing import List
from loguru import logger
from openai import OpenAI
from openai import AzureOpenAI
from openai import AzureOpenAI, OpenAI
from openai.types.chat import ChatCompletion
from app.config import config
@@ -295,7 +295,7 @@ Generate a script for a video, depending on the subject of the video.
paragraphs = response.split("\n\n")
# Select the specified number of paragraphs
selected_paragraphs = paragraphs[:paragraph_number]
# selected_paragraphs = paragraphs[:paragraph_number]
# Join the selected paragraphs into a single string
return "\n\n".join(paragraphs)

View File

@@ -1,14 +1,14 @@
import os
import random
from typing import List
from urllib.parse import urlencode
import requests
from typing import List
from loguru import logger
from moviepy.video.io.VideoFileClip import VideoFileClip
from app.config import config
from app.models.schema import VideoAspect, VideoConcatMode, MaterialInfo
from app.models.schema import MaterialInfo, VideoAspect, VideoConcatMode
from app.utils import utils
requested_count = 0
@@ -42,7 +42,7 @@ def search_videos_pexels(
api_key = get_api_key("pexels_api_keys")
headers = {
"Authorization": api_key,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
}
# Build URL
params = {"query": search_term, "per_page": 20, "orientation": video_orientation}
@@ -129,7 +129,7 @@ def search_videos_pixabay(
for video_type in video_files:
video = video_files[video_type]
w = int(video["width"])
h = int(video["height"])
# h = int(video["height"])
if w >= video_width:
item = MaterialInfo()
item.provider = "pixabay"
@@ -169,7 +169,11 @@ def save_video(video_url: str, save_dir: str = "") -> str:
with open(video_path, "wb") as f:
f.write(
requests.get(
video_url, headers=headers, proxies=config.proxy, verify=False, timeout=(60, 240)
video_url,
headers=headers,
proxies=config.proxy,
verify=False,
timeout=(60, 240),
).content
)
@@ -184,7 +188,7 @@ def save_video(video_url: str, save_dir: str = "") -> str:
except Exception as e:
try:
os.remove(video_path)
except Exception as e:
except Exception:
pass
logger.warning(f"invalid video file: {video_path} => {str(e)}")
return ""

View File

@@ -1,5 +1,6 @@
import ast
from abc import ABC, abstractmethod
from app.config import config
from app.models import const

View File

@@ -1,9 +1,9 @@
import json
import os.path
import re
from timeit import default_timer as timer
from faster_whisper import WhisperModel
from timeit import default_timer as timer
from loguru import logger
from app.config import config
@@ -88,7 +88,7 @@ def create(audio_file, subtitle_file: str = ""):
is_segmented = True
seg_end = word.end
# 如果包含标点,则断句
# If it contains punctuation, then break the sentence.
seg_text += word.word
if utils.str_contains_punctuation(word.word):
@@ -246,7 +246,7 @@ def correct(subtitle_file, video_script):
script_index += 1
subtitle_index = next_subtitle_index
# 处理剩余的脚本行
# Process the remaining lines of the script.
while script_index < len(script_lines):
logger.warning(f"Extra script line: {script_lines[script_index]}")
if subtitle_index < len(subtitle_items):

View File

@@ -3,7 +3,6 @@ import os.path
import re
from os import path
from edge_tts import SubMaker
from loguru import logger
from app.config import config
@@ -158,7 +157,7 @@ def get_video_materials(task_id, params, video_terms, audio_duration):
def generate_final_videos(
task_id, params, downloaded_videos, audio_file, subtitle_path
task_id, params, downloaded_videos, audio_file, subtitle_path
):
final_video_paths = []
combined_video_paths = []
@@ -212,7 +211,7 @@ def start(task_id, params: VideoParams, stop_at: str = "video"):
if type(params.video_concat_mode) is str:
params.video_concat_mode = VideoConcatMode(params.video_concat_mode)
# 1. Generate script
video_script = generate_script(task_id, params)
if not video_script:
@@ -246,7 +245,9 @@ def start(task_id, params: VideoParams, stop_at: str = "video"):
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20)
# 3. Generate audio
audio_file, audio_duration, sub_maker = generate_audio(task_id, params, video_script)
audio_file, audio_duration, sub_maker = generate_audio(
task_id, params, video_script
)
if not audio_file:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
return
@@ -263,7 +264,9 @@ def start(task_id, params: VideoParams, stop_at: str = "video"):
return {"audio_file": audio_file, "audio_duration": audio_duration}
# 4. Generate subtitle
subtitle_path = generate_subtitle(task_id, params, video_script, sub_maker, audio_file)
subtitle_path = generate_subtitle(
task_id, params, video_script, sub_maker, audio_file
)
if stop_at == "subtitle":
sm.state.update_task(
@@ -330,6 +333,5 @@ if __name__ == "__main__":
video_subject="金钱的作用",
voice_name="zh-CN-XiaoyiNeural-Female",
voice_rate=1.0,
)
start(task_id, params, stop_at="video")

View File

@@ -4,7 +4,17 @@ import random
from typing import List
from loguru import logger
from moviepy import *
from moviepy import (
AudioFileClip,
ColorClip,
CompositeAudioClip,
CompositeVideoClip,
ImageClip,
TextClip,
VideoFileClip,
afx,
concatenate_videoclips,
)
from moviepy.video.tools.subtitles import SubtitlesClip
from PIL import ImageFont
@@ -90,15 +100,15 @@ def combine_videos(
video_ratio = video_width / video_height
if clip_ratio == video_ratio:
# 等比例缩放
# Resize proportionally
clip = clip.resized((video_width, video_height))
else:
# 等比缩放视频
# Resize proportionally
if clip_ratio > video_ratio:
# 按照目标宽度等比缩放
# Resize proportionally based on the target width
scale_factor = video_width / clip_w
else:
# 按照目标高度等比缩放
# Resize proportionally based on the target height
scale_factor = video_height / clip_h
new_width = int(clip_w * scale_factor)
@@ -143,7 +153,7 @@ def combine_videos(
def wrap_text(text, max_width, font="Arial", fontsize=60):
# 创建字体对象
# Create ImageFont
font = ImageFont.truetype(font, fontsize)
def get_text_size(inner_text):
@@ -257,12 +267,14 @@ def generate_video(
elif params.subtitle_position == "top":
_clip = _clip.with_position(("center", video_height * 0.05))
elif params.subtitle_position == "custom":
# 确保字幕完全在屏幕内
margin = 10 # 额外的边距,单位为像素
# Ensure the subtitle is fully within the screen bounds
margin = 10 # Additional margin, in pixels
max_y = video_height - _clip.h - margin
min_y = margin
custom_y = (video_height - _clip.h) * (params.custom_position / 100)
custom_y = max(min_y, min(custom_y, max_y)) # 限制 y 值在有效范围内
custom_y = max(
min_y, min(custom_y, max_y)
) # Constrain the y value within the valid range
_clip = _clip.with_position(("center", custom_y))
else: # center
_clip = _clip.with_position(("center", "center"))
@@ -273,14 +285,16 @@ def generate_video(
[afx.MultiplyVolume(params.voice_volume)]
)
if subtitle_path and os.path.exists(subtitle_path):
generator = lambda text: TextClip(
def make_textclip(text):
return TextClip(
text=text,
font=font_path,
font_size=params.font_size,
)
if subtitle_path and os.path.exists(subtitle_path):
sub = SubtitlesClip(
subtitles=subtitle_path, encoding="utf-8", make_textclip=generator
subtitles=subtitle_path, encoding="utf-8", make_textclip=make_textclip
)
text_clips = []
for item in sub.subtitles:
@@ -335,25 +349,26 @@ def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
if ext in const.FILE_TYPE_IMAGES:
logger.info(f"processing image: {material.url}")
# 创建一个图片剪辑并设置持续时间为3秒钟
# Create an image clip and set its duration to 3 seconds
clip = (
ImageClip(material.url)
.with_duration(clip_duration)
.with_position("center")
)
# 使用resize方法来添加缩放效果。这里使用了lambda函数来使得缩放效果随时间变化。
# 假设我们想要从原始大小逐渐放大到120%的大小。
# t代表当前时间clip.duration为视频总时长这里是3秒。
# 注意1 表示100%的大小所以1.2表示120%的大小
# Apply a zoom effect using the resize method.
# A lambda function is used to make the zoom effect dynamic over time.
# The zoom effect starts from the original size and gradually scales up to 120%.
# t represents the current time, and clip.duration is the total duration of the clip (3 seconds).
# Note: 1 represents 100% size, so 1.2 represents 120% size.
zoom_clip = clip.resized(
lambda t: 1 + (clip_duration * 0.03) * (t / clip.duration)
)
# 如果需要,可以创建一个包含缩放剪辑的复合视频剪辑
# (这在您想要在视频中添加其他元素时非常有用)
# Optionally, create a composite video clip containing the zoomed clip.
# This is useful when you want to add other elements to the video.
final_clip = CompositeVideoClip([zoom_clip])
# 输出视频
# Output the video to a file.
video_file = f"{material.url}.mp4"
final_clip.write_videofile(video_file, fps=30, logger=None)
final_clip.close()

View File

@@ -2,11 +2,13 @@ import asyncio
import os
import re
from datetime import datetime
from typing import Union
from xml.sax.saxutils import unescape
import edge_tts
from edge_tts import SubMaker, submaker
from edge_tts.submaker import mktimestamp
from loguru import logger
from edge_tts import submaker, SubMaker
import edge_tts
from moviepy.video.tools import subtitles
from app.config import config
@@ -1054,7 +1056,7 @@ def is_azure_v2_voice(voice_name: str):
def tts(
text: str, voice_name: str, voice_rate: float, voice_file: str
) -> [SubMaker, None]:
) -> Union[SubMaker, None]:
if is_azure_v2_voice(voice_name):
return azure_tts_v2(text, voice_name, voice_file)
return azure_tts_v1(text, voice_name, voice_rate, voice_file)
@@ -1072,7 +1074,7 @@ def convert_rate_to_percent(rate: float) -> str:
def azure_tts_v1(
text: str, voice_name: str, voice_rate: float, voice_file: str
) -> [SubMaker, None]:
) -> Union[SubMaker, None]:
voice_name = parse_voice_name(voice_name)
text = text.strip()
rate_str = convert_rate_to_percent(voice_rate)
@@ -1095,7 +1097,7 @@ def azure_tts_v1(
sub_maker = asyncio.run(_do())
if not sub_maker or not sub_maker.subs:
logger.warning(f"failed, sub_maker is None or sub_maker.subs is None")
logger.warning("failed, sub_maker is None or sub_maker.subs is None")
continue
logger.info(f"completed, output file: {voice_file}")
@@ -1105,7 +1107,7 @@ def azure_tts_v1(
return None
def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> [SubMaker, None]:
def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> Union[SubMaker, None]:
voice_name = is_azure_v2_voice(voice_name)
if not voice_name:
logger.error(f"invalid voice name: {voice_name}")

View File

@@ -1,12 +1,12 @@
import json
import locale
import os
import platform
import threading
from typing import Any
from loguru import logger
import json
from uuid import uuid4
import urllib3
from loguru import logger
from app.models import const
@@ -26,33 +26,33 @@ def get_response(status: int, data: Any = None, message: str = ""):
def to_json(obj):
try:
# 定义一个辅助函数来处理不同类型的对象
# Define a helper function to handle different types of objects
def serialize(o):
# 如果对象是可序列化类型,直接返回
# If the object is a serializable type, return it directly
if isinstance(o, (int, float, bool, str)) or o is None:
return o
# 如果对象是二进制数据转换为base64编码的字符串
# If the object is binary data, convert it to a base64-encoded string
elif isinstance(o, bytes):
return "*** binary data ***"
# 如果对象是字典,递归处理每个键值对
# If the object is a dictionary, recursively process each key-value pair
elif isinstance(o, dict):
return {k: serialize(v) for k, v in o.items()}
# 如果对象是列表或元组,递归处理每个元素
# If the object is a list or tuple, recursively process each element
elif isinstance(o, (list, tuple)):
return [serialize(item) for item in o]
# 如果对象是自定义类型尝试返回其__dict__属性
# If the object is a custom type, attempt to return its __dict__ attribute
elif hasattr(o, "__dict__"):
return serialize(o.__dict__)
# 其他情况返回None或者可以选择抛出异常
# Return None for other cases (or choose to raise an exception)
else:
return None
# 使用serialize函数处理输入对象
# Use the serialize function to process the input object
serialized_obj = serialize(obj)
# 序列化处理后的对象为JSON字符串
# Serialize the processed object into a JSON string
return json.dumps(serialized_obj, ensure_ascii=False, indent=4)
except Exception as e:
except Exception:
return None
@@ -94,7 +94,7 @@ def task_dir(sub_dir: str = ""):
def font_dir(sub_dir: str = ""):
d = resource_dir(f"fonts")
d = resource_dir("fonts")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
@@ -103,7 +103,7 @@ def font_dir(sub_dir: str = ""):
def song_dir(sub_dir: str = ""):
d = resource_dir(f"songs")
d = resource_dir("songs")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
@@ -112,7 +112,7 @@ def song_dir(sub_dir: str = ""):
def public_dir(sub_dir: str = ""):
d = resource_dir(f"public")
d = resource_dir("public")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
@@ -182,7 +182,7 @@ def split_string_by_punctuations(s):
next_char = s[i + 1]
if char == "." and previous_char.isdigit() and next_char.isdigit():
# 取现1万按2.5%收取手续费, 2.5 中的 . 不能作为换行标记
# # In the case of "withdraw 10,000, charged at 2.5% fee", the dot in "2.5" should not be treated as a line break marker
txt += char
continue
@@ -210,7 +210,7 @@ def get_system_locale():
# en_US, en_GB return en
language_code = loc[0].split("_")[0]
return language_code
except Exception as e:
except Exception:
return "en"

View File

@@ -1,5 +1,17 @@
import os
import platform
import sys
from uuid import uuid4
import streamlit as st
from loguru import logger
from app.config import config
from app.models.const import FILE_TYPE_IMAGES, FILE_TYPE_VIDEOS
from app.models.schema import MaterialInfo, VideoAspect, VideoConcatMode, VideoParams
from app.services import llm, voice
from app.services import task as tm
from app.utils import utils
# Add the root directory of the project to the system path to allow importing modules from the project
root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
@@ -9,13 +21,6 @@ if root_dir not in sys.path:
print(sys.path)
print("")
import os
import platform
from uuid import uuid4
import streamlit as st
from loguru import logger
st.set_page_config(
page_title="MoneyPrinterTurbo",
page_icon="🤖",
@@ -30,12 +35,6 @@ st.set_page_config(
},
)
from app.config import config
from app.models.const import FILE_TYPE_IMAGES, FILE_TYPE_VIDEOS
from app.models.schema import MaterialInfo, VideoAspect, VideoConcatMode, VideoParams
from app.services import llm, voice
from app.services import task as tm
from app.utils import utils
hide_streamlit_style = """
<style>#root > div:nth-child(1) > div > div > div > div > section > div {padding-top: 0rem;}</style>
@@ -734,7 +733,11 @@ if start_button:
scroll_to_bottom()
st.stop()
if llm_provider != "g4f" and llm_provider != 'ollama' and not config.app.get(f"{llm_provider}_api_key", ""):
if (
llm_provider != "g4f"
and llm_provider != "ollama"
and not config.app.get(f"{llm_provider}_api_key", "")
):
st.error(tr("Please Enter the LLM API Key"))
scroll_to_bottom()
st.stop()