2024-03-11 16:37:49 +08:00
|
|
|
import logging
|
|
|
|
|
import re
|
|
|
|
|
import json
|
|
|
|
|
from typing import List
|
|
|
|
|
from loguru import logger
|
2024-03-15 16:41:33 +08:00
|
|
|
from openai import OpenAI
|
2024-03-25 15:20:00 +08:00
|
|
|
from openai import AzureOpenAI
|
2024-04-11 18:25:15 +08:00
|
|
|
from openai.types.chat import ChatCompletion
|
|
|
|
|
|
2024-03-11 16:37:49 +08:00
|
|
|
from app.config import config
|
|
|
|
|
|
2024-04-09 19:50:39 +08:00
|
|
|
|
2024-03-11 16:37:49 +08:00
|
|
|
def _generate_response(prompt: str) -> str:
|
2024-03-18 17:01:09 +08:00
|
|
|
content = ""
|
2024-03-16 09:00:28 +08:00
|
|
|
llm_provider = config.app.get("llm_provider", "openai")
|
2024-03-18 17:01:09 +08:00
|
|
|
logger.info(f"llm provider: {llm_provider}")
|
|
|
|
|
if llm_provider == "g4f":
|
|
|
|
|
model_name = config.app.get("g4f_model_name", "")
|
|
|
|
|
if not model_name:
|
|
|
|
|
model_name = "gpt-3.5-turbo-16k-0613"
|
2024-03-28 11:01:34 +08:00
|
|
|
import g4f
|
2024-03-18 17:01:09 +08:00
|
|
|
content = g4f.ChatCompletion.create(
|
|
|
|
|
model=model_name,
|
|
|
|
|
messages=[{"role": "user", "content": prompt}],
|
|
|
|
|
)
|
2024-03-15 16:41:33 +08:00
|
|
|
else:
|
2024-03-25 15:20:00 +08:00
|
|
|
api_version = "" # for azure
|
2024-03-18 17:01:09 +08:00
|
|
|
if llm_provider == "moonshot":
|
|
|
|
|
api_key = config.app.get("moonshot_api_key")
|
|
|
|
|
model_name = config.app.get("moonshot_model_name")
|
|
|
|
|
base_url = "https://api.moonshot.cn/v1"
|
2024-04-05 21:09:14 +08:00
|
|
|
elif llm_provider == "ollama":
|
|
|
|
|
# api_key = config.app.get("openai_api_key")
|
2024-04-09 19:50:39 +08:00
|
|
|
api_key = "ollama" # any string works but you are required to have one
|
2024-04-05 21:09:14 +08:00
|
|
|
model_name = config.app.get("ollama_model_name")
|
|
|
|
|
base_url = config.app.get("ollama_base_url", "")
|
|
|
|
|
if not base_url:
|
|
|
|
|
base_url = "http://localhost:11434/v1"
|
2024-03-18 17:01:09 +08:00
|
|
|
elif llm_provider == "openai":
|
|
|
|
|
api_key = config.app.get("openai_api_key")
|
|
|
|
|
model_name = config.app.get("openai_model_name")
|
|
|
|
|
base_url = config.app.get("openai_base_url", "")
|
|
|
|
|
if not base_url:
|
|
|
|
|
base_url = "https://api.openai.com/v1"
|
|
|
|
|
elif llm_provider == "oneapi":
|
|
|
|
|
api_key = config.app.get("oneapi_api_key")
|
|
|
|
|
model_name = config.app.get("oneapi_model_name")
|
|
|
|
|
base_url = config.app.get("oneapi_base_url", "")
|
2024-03-25 15:20:00 +08:00
|
|
|
elif llm_provider == "azure":
|
|
|
|
|
api_key = config.app.get("azure_api_key")
|
|
|
|
|
model_name = config.app.get("azure_model_name")
|
|
|
|
|
base_url = config.app.get("azure_base_url", "")
|
|
|
|
|
api_version = config.app.get("azure_api_version", "2024-02-15-preview")
|
2024-03-31 10:44:52 +05:30
|
|
|
elif llm_provider == "gemini":
|
|
|
|
|
api_key = config.app.get("gemini_api_key")
|
|
|
|
|
model_name = config.app.get("gemini_model_name")
|
2024-03-31 11:07:19 +05:30
|
|
|
base_url = "***"
|
2024-03-28 00:40:24 +08:00
|
|
|
elif llm_provider == "qwen":
|
|
|
|
|
api_key = config.app.get("qwen_api_key")
|
|
|
|
|
model_name = config.app.get("qwen_model_name")
|
|
|
|
|
base_url = "***"
|
2024-04-11 22:55:08 +08:00
|
|
|
elif llm_provider == "cloudflare":
|
|
|
|
|
api_key = config.app.get("cloudflare_api_key")
|
|
|
|
|
model_name = config.app.get("cloudflare_model_name")
|
|
|
|
|
account_id = config.app.get("cloudflare_account_id")
|
|
|
|
|
base_url = "***"
|
2024-03-18 17:01:09 +08:00
|
|
|
else:
|
|
|
|
|
raise ValueError("llm_provider is not set, please set it in the config.toml file.")
|
|
|
|
|
|
|
|
|
|
if not api_key:
|
|
|
|
|
raise ValueError(f"{llm_provider}: api_key is not set, please set it in the config.toml file.")
|
|
|
|
|
if not model_name:
|
|
|
|
|
raise ValueError(f"{llm_provider}: model_name is not set, please set it in the config.toml file.")
|
|
|
|
|
if not base_url:
|
|
|
|
|
raise ValueError(f"{llm_provider}: base_url is not set, please set it in the config.toml file.")
|
2024-03-28 00:40:24 +08:00
|
|
|
|
|
|
|
|
if llm_provider == "qwen":
|
2024-03-28 11:01:34 +08:00
|
|
|
import dashscope
|
2024-03-28 00:40:24 +08:00
|
|
|
dashscope.api_key = api_key
|
|
|
|
|
response = dashscope.Generation.call(
|
|
|
|
|
model=model_name,
|
2024-03-28 11:01:34 +08:00
|
|
|
messages=[{"role": "user", "content": prompt}]
|
2024-03-28 00:40:24 +08:00
|
|
|
)
|
2024-03-28 11:01:34 +08:00
|
|
|
content = response["output"]["text"]
|
2024-03-28 00:40:24 +08:00
|
|
|
return content.replace("\n", "")
|
|
|
|
|
|
2024-03-31 10:44:52 +05:30
|
|
|
if llm_provider == "gemini":
|
2024-04-09 19:50:39 +08:00
|
|
|
import google.generativeai as genai
|
2024-03-31 10:44:52 +05:30
|
|
|
genai.configure(api_key=api_key)
|
|
|
|
|
|
|
|
|
|
generation_config = {
|
2024-04-09 19:50:39 +08:00
|
|
|
"temperature": 0.5,
|
|
|
|
|
"top_p": 1,
|
|
|
|
|
"top_k": 1,
|
|
|
|
|
"max_output_tokens": 2048,
|
2024-03-31 10:44:52 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
safety_settings = [
|
2024-04-09 19:50:39 +08:00
|
|
|
{
|
|
|
|
|
"category": "HARM_CATEGORY_HARASSMENT",
|
|
|
|
|
"threshold": "BLOCK_ONLY_HIGH"
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"category": "HARM_CATEGORY_HATE_SPEECH",
|
|
|
|
|
"threshold": "BLOCK_ONLY_HIGH"
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
|
|
|
|
|
"threshold": "BLOCK_ONLY_HIGH"
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
|
|
|
|
|
"threshold": "BLOCK_ONLY_HIGH"
|
|
|
|
|
},
|
2024-03-31 10:44:52 +05:30
|
|
|
]
|
|
|
|
|
|
|
|
|
|
model = genai.GenerativeModel(model_name=model_name,
|
2024-04-09 19:50:39 +08:00
|
|
|
generation_config=generation_config,
|
|
|
|
|
safety_settings=safety_settings)
|
2024-03-31 10:44:52 +05:30
|
|
|
|
|
|
|
|
convo = model.start_chat(history=[])
|
|
|
|
|
|
|
|
|
|
convo.send_message(prompt)
|
|
|
|
|
return convo.last.text
|
2024-04-11 22:55:08 +08:00
|
|
|
|
|
|
|
|
if llm_provider == "cloudflare":
|
|
|
|
|
import requests
|
|
|
|
|
response = requests.post(
|
|
|
|
|
f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model_name}",
|
|
|
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
|
|
|
json={
|
|
|
|
|
"messages": [
|
|
|
|
|
{"role": "system", "content": "You are a friendly assistant"},
|
|
|
|
|
{"role": "user", "content": prompt}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
result = response.json()
|
|
|
|
|
logger.info(result)
|
|
|
|
|
return result["result"]["response"]
|
2024-03-31 10:44:52 +05:30
|
|
|
|
2024-03-25 15:20:00 +08:00
|
|
|
if llm_provider == "azure":
|
|
|
|
|
client = AzureOpenAI(
|
|
|
|
|
api_key=api_key,
|
|
|
|
|
api_version=api_version,
|
|
|
|
|
azure_endpoint=base_url,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
client = OpenAI(
|
|
|
|
|
api_key=api_key,
|
|
|
|
|
base_url=base_url,
|
|
|
|
|
)
|
2024-03-15 16:41:33 +08:00
|
|
|
|
2024-03-18 17:01:09 +08:00
|
|
|
response = client.chat.completions.create(
|
|
|
|
|
model=model_name,
|
|
|
|
|
messages=[{"role": "user", "content": prompt}]
|
|
|
|
|
)
|
|
|
|
|
if response:
|
2024-04-11 18:25:15 +08:00
|
|
|
if isinstance(response, ChatCompletion):
|
|
|
|
|
content = response.choices[0].message.content
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(
|
|
|
|
|
f"[{llm_provider}] returned an invalid response: \"{response}\", please check your network "
|
|
|
|
|
f"connection and try again.")
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(
|
|
|
|
|
f"[{llm_provider}] returned an empty response, please check your network connection and try again.")
|
2024-03-15 16:41:33 +08:00
|
|
|
|
2024-03-26 16:48:14 +08:00
|
|
|
return content.replace("\n", "")
|
2024-03-11 16:37:49 +08:00
|
|
|
|
|
|
|
|
|
2024-03-26 16:48:14 +08:00
|
|
|
def generate_script(video_subject: str, language: str = "", paragraph_number: int = 1) -> str:
|
2024-03-11 16:37:49 +08:00
|
|
|
prompt = f"""
|
|
|
|
|
# Role: Video Script Generator
|
|
|
|
|
|
|
|
|
|
## Goals:
|
|
|
|
|
Generate a script for a video, depending on the subject of the video.
|
|
|
|
|
|
|
|
|
|
## Constrains:
|
|
|
|
|
1. the script is to be returned as a string with the specified number of paragraphs.
|
|
|
|
|
2. do not under any circumstance reference this prompt in your response.
|
|
|
|
|
3. get straight to the point, don't start with unnecessary things like, "welcome to this video".
|
|
|
|
|
4. you must not include any type of markdown or formatting in the script, never use a title.
|
|
|
|
|
5. only return the raw content of the script.
|
|
|
|
|
6. do not include "voiceover", "narrator" or similar indicators of what should be spoken at the beginning of each paragraph or line.
|
|
|
|
|
7. you must not mention the prompt, or anything about the script itself. also, never talk about the amount of paragraphs or lines. just write the script.
|
2024-03-24 17:50:50 +08:00
|
|
|
8. respond in the same language as the video subject.
|
2024-03-11 16:37:49 +08:00
|
|
|
|
|
|
|
|
# Initialization:
|
|
|
|
|
- video subject: {video_subject}
|
|
|
|
|
- number of paragraphs: {paragraph_number}
|
|
|
|
|
""".strip()
|
2024-03-26 16:48:14 +08:00
|
|
|
if language:
|
|
|
|
|
prompt += f"\n- language: {language}"
|
2024-03-11 16:37:49 +08:00
|
|
|
|
|
|
|
|
final_script = ""
|
|
|
|
|
logger.info(f"subject: {video_subject}")
|
|
|
|
|
logger.debug(f"prompt: \n{prompt}")
|
|
|
|
|
response = _generate_response(prompt=prompt)
|
|
|
|
|
|
|
|
|
|
# Return the generated script
|
|
|
|
|
if response:
|
|
|
|
|
# Clean the script
|
|
|
|
|
# Remove asterisks, hashes
|
|
|
|
|
response = response.replace("*", "")
|
|
|
|
|
response = response.replace("#", "")
|
|
|
|
|
|
|
|
|
|
# Remove markdown syntax
|
|
|
|
|
response = re.sub(r"\[.*\]", "", response)
|
|
|
|
|
response = re.sub(r"\(.*\)", "", response)
|
|
|
|
|
|
|
|
|
|
# Split the script into paragraphs
|
|
|
|
|
paragraphs = response.split("\n\n")
|
|
|
|
|
|
|
|
|
|
# Select the specified number of paragraphs
|
|
|
|
|
selected_paragraphs = paragraphs[:paragraph_number]
|
|
|
|
|
|
|
|
|
|
# Join the selected paragraphs into a single string
|
|
|
|
|
final_script = "\n\n".join(selected_paragraphs)
|
|
|
|
|
|
|
|
|
|
# Print to console the number of paragraphs used
|
|
|
|
|
# logger.info(f"number of paragraphs used: {len(selected_paragraphs)}")
|
|
|
|
|
else:
|
|
|
|
|
logging.error("gpt returned an empty response")
|
|
|
|
|
|
|
|
|
|
logger.success(f"completed: \n{final_script}")
|
|
|
|
|
return final_script
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_terms(video_subject: str, video_script: str, amount: int = 5) -> List[str]:
|
|
|
|
|
prompt = f"""
|
|
|
|
|
# Role: Video Search Terms Generator
|
|
|
|
|
|
|
|
|
|
## Goals:
|
|
|
|
|
Generate {amount} search terms for stock videos, depending on the subject of a video.
|
|
|
|
|
|
|
|
|
|
## Constrains:
|
|
|
|
|
1. the search terms are to be returned as a json-array of strings.
|
|
|
|
|
2. each search term should consist of 1-3 words, always add the main subject of the video.
|
|
|
|
|
3. you must only return the json-array of strings. you must not return anything else. you must not return the script.
|
|
|
|
|
4. the search terms must be related to the subject of the video.
|
|
|
|
|
5. reply with english search terms only.
|
|
|
|
|
|
|
|
|
|
## Output Example:
|
|
|
|
|
["search term 1", "search term 2", "search term 3","search term 4","search term 5"]
|
|
|
|
|
|
|
|
|
|
## Context:
|
|
|
|
|
### Video Subject
|
|
|
|
|
{video_subject}
|
|
|
|
|
|
|
|
|
|
### Video Script
|
|
|
|
|
{video_script}
|
2024-03-28 14:49:03 +08:00
|
|
|
|
|
|
|
|
Please note that you must use English for generating video search terms; Chinese is not accepted.
|
2024-03-11 16:37:49 +08:00
|
|
|
""".strip()
|
|
|
|
|
|
|
|
|
|
logger.info(f"subject: {video_subject}")
|
|
|
|
|
logger.debug(f"prompt: \n{prompt}")
|
|
|
|
|
response = _generate_response(prompt)
|
|
|
|
|
search_terms = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
search_terms = json.loads(response)
|
|
|
|
|
if not isinstance(search_terms, list) or not all(isinstance(term, str) for term in search_terms):
|
|
|
|
|
raise ValueError("response is not a list of strings.")
|
|
|
|
|
|
|
|
|
|
except (json.JSONDecodeError, ValueError):
|
|
|
|
|
# logger.warning(f"gpt returned an unformatted response. attempting to clean...")
|
|
|
|
|
# Attempt to extract list-like string and convert to list
|
|
|
|
|
match = re.search(r'\["(?:[^"\\]|\\.)*"(?:,\s*"[^"\\]*")*\]', response)
|
|
|
|
|
if match:
|
|
|
|
|
try:
|
|
|
|
|
search_terms = json.loads(match.group())
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
logger.error(f"could not parse response: {response}")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
logger.success(f"completed: \n{search_terms}")
|
|
|
|
|
return search_terms
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
video_subject = "生命的意义是什么"
|
|
|
|
|
script = generate_script(video_subject=video_subject, language="zh-CN", paragraph_number=1)
|
|
|
|
|
# print("######################")
|
|
|
|
|
# print(script)
|
2024-03-18 17:01:09 +08:00
|
|
|
# search_terms = generate_terms(video_subject=video_subject, video_script=script, amount=5)
|
2024-03-11 16:37:49 +08:00
|
|
|
# print("######################")
|
|
|
|
|
# print(search_terms)
|