import os import subprocess import time import requests import shutil from requests.exceptions import RequestException, Timeout from urllib.parse import urlparse, unquote def download_video(video_url, draft_name, material_name): """ Download video to specified directory :param video_url: Video URL :param draft_name: Draft name :param material_name: Material name :return: Local video path """ # Ensure directory exists video_dir = f"{draft_name}/assets/video" os.makedirs(video_dir, exist_ok=True) # Generate local filename local_path = f"{video_dir}/{material_name}" # Check if file already exists if os.path.exists(local_path): print(f"Video file already exists: {local_path}") return local_path try: # Use ffmpeg to download video command = [ 'ffmpeg', '-i', video_url, '-c', 'copy', # Direct copy, no re-encoding local_path ] subprocess.run(command, check=True, capture_output=True) return local_path except subprocess.CalledProcessError as e: raise Exception(f"Failed to download video: {e.stderr.decode('utf-8')}") def download_image(image_url, draft_name, material_name): """ Download image to specified directory, and convert to PNG format :param image_url: Image URL :param draft_name: Draft name :param material_name: Material name :return: Local image path """ # Ensure directory exists image_dir = f"{draft_name}/assets/image" os.makedirs(image_dir, exist_ok=True) # Uniformly use png format local_path = f"{image_dir}/{material_name}" # Check if file already exists if os.path.exists(local_path): print(f"Image file already exists: {local_path}") return local_path try: # Use ffmpeg to download and convert image to PNG format command = [ 'ffmpeg', '-i', image_url, '-vf', 'format=rgba', # Convert to RGBA format to support transparency '-frames:v', '1', # Ensure only one frame is processed '-y', # Overwrite existing files local_path ] subprocess.run(command, check=True, capture_output=True) return local_path except subprocess.CalledProcessError as e: raise Exception(f"Failed to download image: {e.stderr.decode('utf-8')}") def download_audio(audio_url, draft_name, material_name): """ Download audio and transcode to MP3 format to specified directory :param audio_url: Audio URL :param draft_name: Draft name :param material_name: Material name :return: Local audio path """ # Ensure directory exists audio_dir = f"{draft_name}/assets/audio" os.makedirs(audio_dir, exist_ok=True) # Generate local filename (keep .mp3 extension) local_path = f"{audio_dir}/{material_name}" # Check if file already exists if os.path.exists(local_path): print(f"Audio file already exists: {local_path}") return local_path try: # Use ffmpeg to download and transcode to MP3 (key modification: specify MP3 encoder) command = [ 'ffmpeg', '-i', audio_url, # Input URL '-c:a', 'libmp3lame', # Force encode audio stream to MP3 '-q:a', '2', # Set audio quality (0-9, 0 is best, 2 balances quality and file size) '-y', # Overwrite existing files (optional) local_path # Output path ] subprocess.run(command, check=True, capture_output=True, text=True) return local_path except subprocess.CalledProcessError as e: raise Exception(f"Failed to download audio:\n{e.stderr}") def download_file(url:str, local_filename, max_retries=3, timeout=180): # 检查是否是本地文件路径 if os.path.exists(url) and os.path.isfile(url): # 是本地文件,直接复制 directory = os.path.dirname(local_filename) # 创建目标目录(如果不存在) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) print(f"Created directory: {directory}") print(f"Copying local file: {url} to {local_filename}") start_time = time.time() # 复制文件 shutil.copy2(url, local_filename) print(f"Copy completed in {time.time()-start_time:.2f} seconds") print(f"File saved as: {os.path.abspath(local_filename)}") return True # 原有的下载逻辑 # Extract directory part directory = os.path.dirname(local_filename) retries = 0 while retries < max_retries: try: if retries > 0: wait_time = 2 ** retries # Exponential backoff strategy print(f"Retrying in {wait_time} seconds... (Attempt {retries+1}/{max_retries})") time.sleep(wait_time) print(f"Downloading file: {local_filename}") start_time = time.time() # Create directory (if it doesn't exist) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) print(f"Created directory: {directory}") with requests.get(url, stream=True, timeout=timeout) as response: response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) block_size = 1024 with open(local_filename, 'wb') as file: bytes_written = 0 for chunk in response.iter_content(block_size): if chunk: file.write(chunk) bytes_written += len(chunk) if total_size > 0: progress = bytes_written / total_size * 100 # For frequently updated progress, consider using logger.debug or more granular control to avoid large log files # Or only output progress to console, not write to file print(f"\r[PROGRESS] {progress:.2f}% ({bytes_written/1024:.2f}KB/{total_size/1024:.2f}KB)", end='') pass # Avoid printing too much progress information in log files if total_size > 0: # print() # Original newline pass print(f"Download completed in {time.time()-start_time:.2f} seconds") print(f"File saved as: {os.path.abspath(local_filename)}") return True except Timeout: print(f"Download timed out after {timeout} seconds") except RequestException as e: print(f"Request failed: {e}") except Exception as e: print(f"Unexpected error during download: {e}") retries += 1 print(f"Download failed after {max_retries} attempts for URL: {url}") return False