fix: live notification

This commit is contained in:
Xinrea
2025-10-24 22:51:33 +08:00
parent 39a8d0d741
commit 42c12b3bf9
6 changed files with 36 additions and 1119 deletions

View File

@@ -1,116 +0,0 @@
use std::{
path::{Path, PathBuf},
process::Stdio,
};
use async_ffmpeg_sidecar::{event::FfmpegEvent, log_parser::FfmpegLogParser};
use tokio::io::{AsyncWriteExt, BufReader};
use super::ffmpeg_path;
#[cfg(target_os = "windows")]
const CREATE_NO_WINDOW: u32 = 0x08000000;
#[cfg(target_os = "windows")]
#[allow(unused_imports)]
use std::os::windows::process::CommandExt;
/// Generate a random filename in hex
pub async fn random_filename() -> String {
format!("{:x}", rand::random::<u64>())
}
pub async fn handle_ffmpeg_process(
ffmpeg_process: &mut tokio::process::Command,
) -> Result<(), String> {
let child = ffmpeg_process
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Log(_level, content) => {
// if contains "out_time_ms=66654667", by the way, it's actually in us
if content.starts_with("out_time_ms") {
let time_str = content.strip_prefix("out_time_ms=").unwrap_or_default();
}
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Error(e) => {
log::error!("[FFmpeg Error] {}", e);
return Err(e);
}
_ => {}
}
}
if let Err(e) = child.wait().await {
return Err(e.to_string());
}
Ok(())
}
pub async fn concat_videos(videos: &[PathBuf], output_path: &Path) -> Result<(), String> {
// ffmpeg -i input1.mp4 -i input2.mp4 -i input3.mp4 -c copy output.mp4
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
let output_folder = output_path.parent().unwrap();
if !output_folder.exists() {
std::fs::create_dir_all(output_folder).unwrap();
}
let filelist_filename = format!("filelist_{}.txt", random_filename().await);
let mut filelist = tokio::fs::File::create(&output_folder.join(&filelist_filename))
.await
.unwrap();
for video in videos {
filelist
.write_all(format!("file '{}'\n", video.display()).as_bytes())
.await
.unwrap();
}
filelist.flush().await.unwrap();
// Convert &[PathBuf] to &[&Path] for check_videos
let video_refs: Vec<&Path> = videos.iter().map(|p| p.as_path()).collect();
let should_encode = !super::check_videos(&video_refs).await;
ffmpeg_process.args([
"-f",
"concat",
"-safe",
"0",
"-i",
output_folder.join(&filelist_filename).to_str().unwrap(),
]);
if should_encode {
ffmpeg_process.args(["-vf", "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2"]);
ffmpeg_process.args(["-r", "60"]);
ffmpeg_process.args(["-c", "libx264"]);
ffmpeg_process.args(["-c:a", "aac"]);
ffmpeg_process.args(["-b:v", "6000k"]);
ffmpeg_process.args(["-b:a", "128k"]);
ffmpeg_process.args(["-threads", "0"]);
} else {
ffmpeg_process.args(["-c", "copy"]);
}
ffmpeg_process.args([output_path.to_str().unwrap()]);
ffmpeg_process.args(["-progress", "pipe:2"]);
ffmpeg_process.args(["-y"]);
handle_ffmpeg_process(&mut ffmpeg_process).await?;
// clean up filelist
let _ = tokio::fs::remove_file(output_folder.join(&filelist_filename)).await;
Ok(())
}

View File

@@ -1,14 +1,4 @@
use std::fmt;
use std::path::{Path, PathBuf};
use std::process::Stdio;
pub mod general;
pub mod playlist;
use async_ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
use async_ffmpeg_sidecar::log_parser::FfmpegLogParser;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWriteExt, BufReader};
// 视频元数据结构
#[derive(Debug, Clone)]
@@ -41,390 +31,6 @@ const CREATE_NO_WINDOW: u32 = 0x08000000;
#[allow(unused_imports)]
use std::os::windows::process::CommandExt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Range {
pub start: f64,
pub end: f64,
}
impl fmt::Display for Range {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[{}, {}]", self.start, self.end)
}
}
impl Range {
pub fn duration(&self) -> f64 {
self.end - self.start
}
pub fn is_in(&self, v: f64) -> bool {
v >= self.start && v <= self.end
}
}
pub async fn transcode(file: &Path, output_path: &Path, copy_codecs: bool) -> Result<(), String> {
// ffmpeg -i fixed_\[30655190\]1742887114_0325084106_81.5.mp4 -c:v libx264 -c:a aac -b:v 6000k -b:a 64k -compression_level 0 -threads 0 output.mp3
log::info!("Transcode: {} copy: {}", file.display(), copy_codecs);
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
ffmpeg_process.args(["-i", file.to_str().unwrap()]);
if copy_codecs {
ffmpeg_process.args(["-c:v", "copy"]).args(["-c:a", "copy"]);
} else {
ffmpeg_process
.args(["-vf", "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2"])
.args(["-c:v", "libx264"])
.args(["-c:a", "aac"])
.args(["-b:v", "6000k"])
.args(["-b:a", "128k"])
.args(["-threads", "0"]);
}
let child = ffmpeg_process
.args([output_path.to_str().unwrap()])
.args(["-y"])
.args(["-progress", "pipe:2"])
.stderr(Stdio::piped())
.spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Progress(p) => {
log::info!("压制中:{}", p.time);
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Error(e) => {
log::error!("Transcode error: {e}");
return Err(e.to_string());
}
_ => {}
}
}
if let Err(e) = child.wait().await {
return Err(e.to_string());
}
Ok(())
}
pub async fn trim_video(
file: &Path,
output_path: &Path,
start_time: f64,
duration: f64,
) -> Result<(), String> {
// ffmpeg -i fixed_\[30655190\]1742887114_0325084106_81.5.mp4 -ss 0 -t 10 output.mp4
log::info!("Trim video task start: {}", file.display());
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
ffmpeg_process.args(["-ss", &start_time.to_string()]);
ffmpeg_process.args(["-i", file.to_str().unwrap()]);
ffmpeg_process.args(["-t", &duration.to_string()]);
ffmpeg_process.args(["-c", "copy"]);
ffmpeg_process.args([output_path.to_str().unwrap()]);
ffmpeg_process.args(["-y"]);
ffmpeg_process.args(["-progress", "pipe:2"]);
ffmpeg_process.stderr(Stdio::piped());
let child = ffmpeg_process.spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Progress(p) => {
log::info!("切片中:{}", p.time);
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Error(e) => {
log::error!("Trim video error: {e}");
return Err(e.to_string());
}
_ => {}
}
}
if let Err(e) = child.wait().await {
log::error!("Trim video error: {e}");
return Err(e.to_string());
}
log::info!("Trim video task end: {}", output_path.display());
Ok(())
}
pub async fn extract_audio_chunks(file: &Path, format: &str) -> Result<PathBuf, String> {
// ffmpeg -i fixed_\[30655190\]1742887114_0325084106_81.5.mp4 -ar 16000 test.wav
log::info!("Extract audio task start: {}", file.display());
let output_path = file.with_extension(format);
let mut extract_error = None;
// 降低采样率以提高处理速度,同时保持足够的音质用于语音识别
let sample_rate = if format == "mp3" { "22050" } else { "16000" };
// First, get the duration of the input file
let duration = get_audio_duration(file).await?;
log::info!("Audio duration: {duration} seconds");
// Split into chunks of 30 seconds
let chunk_duration = 30;
let chunk_count = (duration as f64 / f64::from(chunk_duration)).ceil() as usize;
log::info!("Splitting into {chunk_count} chunks of {chunk_duration} seconds each");
// Create output directory for chunks
let output_dir = output_path.parent().unwrap();
let base_name = output_path.file_stem().unwrap().to_str().unwrap();
let chunk_dir = output_dir.join(format!("{base_name}_chunks"));
if !chunk_dir.exists() {
std::fs::create_dir_all(&chunk_dir)
.map_err(|e| format!("Failed to create chunk directory: {e}"))?;
}
// Use ffmpeg segment feature to split audio into chunks
let segment_pattern = chunk_dir.join(format!("{base_name}_%03d.{format}"));
// 构建优化的ffmpeg命令参数
let file_str = file.to_str().unwrap();
let chunk_duration_str = chunk_duration.to_string();
let segment_pattern_str = segment_pattern.to_str().unwrap();
let mut args = vec![
"-i",
file_str,
"-ar",
sample_rate,
"-vn",
"-f",
"segment",
"-segment_time",
&chunk_duration_str,
"-reset_timestamps",
"1",
"-y",
"-progress",
"pipe:2",
];
// 根据格式添加优化的编码参数
if format == "mp3" {
args.extend_from_slice(&[
"-c:a",
"mp3",
"-b:a",
"64k", // 降低比特率以提高速度
"-compression_level",
"0", // 最快压缩
]);
} else {
args.extend_from_slice(&[
"-c:a",
"pcm_s16le", // 使用PCM编码速度更快
]);
}
// 添加性能优化参数
args.extend_from_slice(&[
"-threads", "0", // 使用所有可用CPU核心
]);
args.push(segment_pattern_str);
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
let child = ffmpeg_process.args(&args).stderr(Stdio::piped()).spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Error(e) => {
log::error!("Extract audio error: {e}");
extract_error = Some(e.to_string());
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Log(_level, _content) => {}
_ => {}
}
}
if let Err(e) = child.wait().await {
log::error!("Extract audio error: {e}");
return Err(e.to_string());
}
if let Some(error) = extract_error {
log::error!("Extract audio error: {error}");
Err(error)
} else {
log::info!(
"Extract audio task end: {} chunks created in {}",
chunk_count,
chunk_dir.display()
);
Ok(chunk_dir)
}
}
/// Get the duration of an audio/video file in seconds
async fn get_audio_duration(file: &Path) -> Result<u64, String> {
// Use ffprobe with format option to get duration
let mut ffprobe_process = tokio::process::Command::new(ffprobe_path());
#[cfg(target_os = "windows")]
ffprobe_process.creation_flags(CREATE_NO_WINDOW);
let child = ffprobe_process
.args(["-v", "quiet"])
.args(["-show_entries", "format=duration"])
.args(["-of", "csv=p=0"])
.args(["-i", file.to_str().unwrap()])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
if let Err(e) = child {
return Err(format!("Failed to spawn ffprobe process: {e}"));
}
let mut child = child.unwrap();
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout);
let mut parser = FfmpegLogParser::new(reader);
let mut duration = None;
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::LogEOF => break,
FfmpegEvent::Log(_level, content) => {
// The new command outputs duration directly as a float
if let Ok(seconds_f64) = content.trim().parse::<f64>() {
duration = Some(seconds_f64.ceil() as u64);
log::debug!("Parsed duration: {seconds_f64} seconds");
}
}
_ => {}
}
}
if let Err(e) = child.wait().await {
log::error!("Failed to get duration: {e}");
return Err(e.to_string());
}
duration.ok_or_else(|| "Failed to parse duration".to_string())
}
pub async fn generic_ffmpeg_command(args: &[&str]) -> Result<String, String> {
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
let child = ffmpeg_process.args(args).stderr(Stdio::piped()).spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
let mut logs = Vec::new();
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Log(_level, content) => {
logs.push(content);
}
FfmpegEvent::LogEOF => break,
_ => {}
}
}
if let Err(e) = child.wait().await {
log::error!("Generic ffmpeg command error: {e}");
return Err(e.to_string());
}
Ok(logs.join("\n"))
}
/// Trying to run ffmpeg for version
pub async fn check_ffmpeg() -> Result<String, String> {
let child = tokio::process::Command::new(ffmpeg_path())
.arg("-version")
.stdout(Stdio::piped())
.spawn();
if let Err(e) = child {
log::error!("Failed to spawn ffmpeg process: {e}");
return Err(e.to_string());
}
let mut child = child.unwrap();
let stdout = child.stdout.take();
if stdout.is_none() {
log::error!("Failed to take ffmpeg output");
return Err("Failed to take ffmpeg output".into());
}
let stdout = stdout.unwrap();
let reader = BufReader::new(stdout);
let mut parser = FfmpegLogParser::new(reader);
let mut version = None;
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::ParsedVersion(v) => version = Some(v.version),
FfmpegEvent::LogEOF => break,
_ => {}
}
}
if let Some(version) = version {
Ok(version)
} else {
Err("Failed to parse version from output".into())
}
}
fn ffmpeg_path() -> PathBuf {
let mut path = Path::new("ffmpeg").to_path_buf();
if cfg!(windows) {
path.set_extension("exe");
}
path
}
fn ffprobe_path() -> PathBuf {
let mut path = Path::new("ffprobe").to_path_buf();
if cfg!(windows) {
@@ -442,7 +48,7 @@ fn ffprobe_path() -> PathBuf {
/// # Returns
/// A `Result` containing the video metadata or an error message.
pub async fn extract_video_metadata(file_path: &Path) -> Result<VideoMetadata, String> {
let mut ffprobe_process = tokio::process::Command::new("ffprobe");
let mut ffprobe_process = tokio::process::Command::new(ffprobe_path());
#[cfg(target_os = "windows")]
ffprobe_process.creation_flags(CREATE_NO_WINDOW);
@@ -503,434 +109,3 @@ pub async fn extract_video_metadata(file_path: &Path) -> Result<VideoMetadata, S
}
Ok(metadata)
}
/// Generate thumbnail file from video, capturing a frame at the specified timestamp.
///
/// # Arguments
/// * `video_full_path` - The full path to the video file.
/// * `timestamp` - The timestamp (in seconds) to capture the thumbnail.
///
/// # Returns
/// The path to the generated thumbnail image.
pub async fn generate_thumbnail(video_full_path: &Path, timestamp: f64) -> Result<PathBuf, String> {
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
let thumbnail_full_path = video_full_path.with_extension("jpg");
let output = ffmpeg_process
.args(["-i", &format!("{}", video_full_path.display())])
.args(["-ss", &timestamp.to_string()])
.args(["-vframes", "1"])
.args(["-y", thumbnail_full_path.to_str().unwrap()])
.output()
.await
.map_err(|e| format!("生成缩略图失败: {e}"))?;
if !output.status.success() {
return Err(format!(
"ffmpeg生成缩略图失败: {}",
String::from_utf8_lossy(&output.stderr)
));
}
// 记录生成的缩略图信息
if let Ok(metadata) = std::fs::metadata(&thumbnail_full_path) {
log::info!(
"生成缩略图完成: {} (文件大小: {} bytes)",
thumbnail_full_path.display(),
metadata.len()
);
} else {
log::info!("生成缩略图完成: {}", thumbnail_full_path.display());
}
Ok(thumbnail_full_path)
}
/// Check if all videos have same encoding and resolution
pub async fn check_videos(video_paths: &[&Path]) -> bool {
// check if all playlist paths exist
let mut video_codec = "".to_owned();
let mut audio_codec = "".to_owned();
let mut width = 0;
let mut height = 0;
for video_path in video_paths.iter() {
if !Path::new(video_path).exists() {
continue;
}
let metadata = extract_video_metadata(Path::new(video_path)).await;
if metadata.is_err() {
log::error!(
"Failed to extract video metadata: {}",
metadata.unwrap_err()
);
return false;
}
let metadata = metadata.unwrap();
// check video codec
if !video_codec.is_empty() && metadata.video_codec != video_codec {
log::error!("Video codec does not match: {}", video_path.display());
return false;
} else {
video_codec = metadata.video_codec;
}
// check audio codec
if !audio_codec.is_empty() && metadata.audio_codec != audio_codec {
log::error!("Audio codec does not match: {}", video_path.display());
return false;
} else {
audio_codec = metadata.audio_codec;
}
// check width
if width > 0 && metadata.width != width {
log::error!("Video width does not match: {}", video_path.display());
return false;
} else {
width = metadata.width;
}
// check height
if height > 0 && metadata.height != height {
log::error!("Video height does not match: {}", video_path.display());
return false;
} else {
height = metadata.height;
}
}
true
}
pub async fn convert_fmp4_to_ts_raw(
header_data: &[u8],
source_data: &[u8],
output_ts: &Path,
) -> Result<(), String> {
// Combine the data
let mut combined_data = header_data.to_vec();
combined_data.extend_from_slice(source_data);
// Build ffmpeg command to convert combined data to TS
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
let child = ffmpeg_process
.args(["-f", "mp4"])
.args(["-i", "-"]) // Read from stdin
.args(["-c:v", "libx264"])
.args(["-b:v", "6000k"])
.args(["-maxrate", "10000k"])
.args(["-bufsize", "16000k"])
.args(["-c:a", "copy"])
.args(["-threads", "0"])
.args(["-f", "mpegts"])
.args(["-y", output_ts.to_str().unwrap()]) // Overwrite output
.args(["-progress", "pipe:2"]) // Progress to stderr
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
if let Err(e) = child {
return Err(format!("Failed to spawn ffmpeg process: {e}"));
}
let mut child = child.unwrap();
// Write the combined data to stdin and close it
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(&combined_data)
.await
.map_err(|e| format!("Failed to write data to ffmpeg stdin: {e}"))?;
// stdin is automatically closed when dropped
}
// Parse ffmpeg output for progress and errors
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
let mut conversion_error = None;
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::LogEOF => break,
FfmpegEvent::Log(level, content) => {
if content.contains("error") || level == LogLevel::Error {
log::error!("fMP4 to TS conversion error: {content}");
}
}
FfmpegEvent::Error(e) => {
log::error!("fMP4 to TS conversion error: {e}");
conversion_error = Some(e.to_string());
}
_ => {}
}
}
// Wait for ffmpeg to complete
if let Err(e) = child.wait().await {
return Err(format!("ffmpeg process failed: {e}"));
}
// Check for conversion errors
if let Some(error) = conversion_error {
Err(error)
} else {
Ok(())
}
}
/// Convert fragmented MP4 (fMP4) files to MPEG-TS format
/// Combines an initialization segment (header) and a media segment (source) into a single TS file
///
/// # Arguments
/// * `header` - Path to the initialization segment (.mp4)
/// * `source` - Path to the media segment (.m4s)
///
/// # Returns
/// A `Result` indicating success or failure with error message
#[allow(unused)]
pub async fn convert_fmp4_to_ts(header: &Path, source: &Path) -> Result<(), String> {
log::info!(
"Converting fMP4 to TS: {} + {}",
header.display(),
source.display()
);
// Check if input files exist
if !header.exists() {
return Err(format!("Header file does not exist: {}", header.display()));
}
if !source.exists() {
return Err(format!("Source file does not exist: {}", source.display()));
}
let output_ts = source.with_extension("ts");
// Read the header and source files into memory
let header_data = tokio::fs::read(header)
.await
.map_err(|e| format!("Failed to read header file: {e}"))?;
let source_data = tokio::fs::read(source)
.await
.map_err(|e| format!("Failed to read source file: {e}"))?;
convert_fmp4_to_ts_raw(&header_data, &source_data, &output_ts).await
}
// tests
#[cfg(test)]
mod tests {
use super::*;
// 测试 Range 结构体
#[test]
fn test_range_creation() {
let range = Range {
start: 10.0,
end: 30.0,
};
assert_eq!(range.start, 10.0);
assert_eq!(range.end, 30.0);
assert_eq!(range.duration(), 20.0);
}
#[test]
fn test_range_duration() {
let range = Range {
start: 0.0,
end: 60.0,
};
assert_eq!(range.duration(), 60.0);
let range2 = Range {
start: 15.5,
end: 45.5,
};
assert_eq!(range2.duration(), 30.0);
}
#[test]
fn test_range_display() {
let range = Range {
start: 5.0,
end: 25.0,
};
assert_eq!(range.to_string(), "[5, 25]");
}
#[test]
fn test_range_edge_cases() {
let zero_range = Range {
start: 0.0,
end: 0.0,
};
assert_eq!(zero_range.duration(), 0.0);
let negative_start = Range {
start: -5.0,
end: 10.0,
};
assert_eq!(negative_start.duration(), 15.0);
let large_range = Range {
start: 1000.0,
end: 2000.0,
};
assert_eq!(large_range.duration(), 1000.0);
}
// 测试视频元数据提取
#[tokio::test]
async fn test_extract_video_metadata() {
let test_video = Path::new("tests/video/test.mp4");
if test_video.exists() {
let metadata = extract_video_metadata(test_video).await.unwrap();
println!("metadata: {:?}", metadata);
assert!(metadata.duration > 0.0);
assert!(metadata.width > 0);
assert!(metadata.height > 0);
}
}
// 测试音频时长获取
#[tokio::test]
async fn test_get_audio_duration() {
let test_audio = Path::new("tests/audio/test.wav");
if test_audio.exists() {
let duration = get_audio_duration(test_audio).await.unwrap();
assert!(duration > 0);
}
}
// 测试缩略图生成
#[tokio::test]
async fn test_generate_thumbnail() {
let file = Path::new("tests/video/test.mp4");
if file.exists() {
let thumbnail_file = generate_thumbnail(file, 0.0).await.unwrap();
assert!(thumbnail_file.exists());
assert_eq!(thumbnail_file.extension().unwrap(), "jpg");
// clean up
let _ = std::fs::remove_file(thumbnail_file);
}
}
// 测试 FFmpeg 版本检查
#[tokio::test]
async fn test_check_ffmpeg() {
let result = check_ffmpeg().await;
match result {
Ok(version) => {
assert!(!version.is_empty());
// FFmpeg 版本字符串可能不包含 "ffmpeg" 这个词,所以检查是否包含数字
assert!(version.chars().any(|c| c.is_ascii_digit()));
}
Err(_) => {
// FFmpeg 可能没有安装,这是正常的
println!("FFmpeg not available for testing");
}
}
}
// 测试通用 FFmpeg 命令
#[tokio::test]
async fn test_generic_ffmpeg_command() {
let result = generic_ffmpeg_command(&["-version"]).await;
match result {
Ok(_output) => {
// 输出可能为空或者不包含 "ffmpeg" 字符串,我们只检查函数能正常执行
println!("FFmpeg command executed successfully");
}
Err(_) => {
// FFmpeg 可能没有安装,这是正常的
println!("FFmpeg not available for testing");
}
}
}
// 测试路径构建函数
#[test]
fn test_ffmpeg_paths() {
let ffmpeg_path = ffmpeg_path();
let ffprobe_path = ffprobe_path();
#[cfg(windows)]
{
assert_eq!(ffmpeg_path.extension().unwrap(), "exe");
assert_eq!(ffprobe_path.extension().unwrap(), "exe");
}
#[cfg(not(windows))]
{
assert_eq!(ffmpeg_path.file_name().unwrap(), "ffmpeg");
assert_eq!(ffprobe_path.file_name().unwrap(), "ffprobe");
}
}
// 测试音频分块目录结构
#[test]
fn test_audio_chunk_directory_structure() {
let test_file = Path::new("tests/audio/test.wav");
let output_path = test_file.with_extension("wav");
let output_dir = output_path.parent().unwrap();
let base_name = output_path.file_stem().unwrap().to_str().unwrap();
let chunk_dir = output_dir.join(format!("{base_name}_chunks"));
assert!(chunk_dir.to_string_lossy().contains("_chunks"));
assert!(chunk_dir.to_string_lossy().contains("test"));
}
// 测试 fMP4 到 TS 转换
#[tokio::test]
async fn test_convert_fmp4_to_ts() {
let header_file = Path::new("tests/video/init.m4s");
let segment_file = Path::new("tests/video/segment.m4s");
let output_file = Path::new("tests/video/segment.ts");
// 如果测试文件存在,则进行转换测试
if header_file.exists() && segment_file.exists() {
let result = convert_fmp4_to_ts(header_file, segment_file).await;
// 检查转换是否成功
match result {
Ok(()) => {
// 检查输出文件是否创建
assert!(output_file.exists());
log::info!("fMP4 to TS conversion test passed");
// 清理测试文件
let _ = std::fs::remove_file(output_file);
}
Err(e) => {
log::error!("fMP4 to TS conversion test failed: {}", e);
// 对于测试文件不存在或其他错误,我们仍然认为测试通过
// 因为这不是功能性问题
}
}
} else {
log::info!("Test files not found, skipping fMP4 to TS conversion test");
}
}
// 测试 fMP4 到 TS 转换的错误处理
#[tokio::test]
async fn test_convert_fmp4_to_ts_error_handling() {
let non_existent_header = Path::new("tests/video/non_existent_init.mp4");
let non_existent_segment = Path::new("tests/video/non_existent_segment.m4s");
// 测试文件不存在的错误处理
let result = convert_fmp4_to_ts(non_existent_header, non_existent_segment).await;
assert!(result.is_err());
let error_msg = result.unwrap_err();
assert!(error_msg.contains("does not exist"));
log::info!("fMP4 to TS error handling test passed");
}
}

View File

@@ -1,129 +0,0 @@
use std::path::Path;
use m3u8_rs::Map;
use tokio::io::AsyncWriteExt;
use crate::ffmpeg::{ffmpeg_path, general::handle_ffmpeg_process};
use super::Range;
pub async fn playlist_to_video(
playlist_path: &Path,
output_path: &Path,
range: Option<Range>,
) -> Result<(), String> {
let (_, playlist) = m3u8_rs::parse_media_playlist(
&tokio::fs::read(playlist_path)
.await
.map_err(|e| e.to_string())?,
)
.unwrap();
let mut start_offset = None;
let mut segments = Vec::new();
if let Some(range) = &range {
let mut duration = 0.0;
for s in playlist.segments.clone() {
if range.is_in(duration) || range.is_in(duration + s.duration as f64) {
segments.push(s.clone());
if start_offset.is_none() {
start_offset = Some(range.start - duration);
}
}
duration += s.duration as f64;
}
} else {
segments = playlist.segments.clone();
}
if segments.is_empty() {
return Err("No segments found".to_string());
}
let first_segment = playlist.segments.first().unwrap().clone();
let mut header_url = first_segment
.unknown_tags
.iter()
.find(|t| t.tag == "X-MAP")
.map(|t| {
let rest = t.rest.clone().unwrap();
rest.split('=').nth(1).unwrap().replace("\\\"", "")
});
if header_url.is_none() {
// map: Some(Map { uri: "h1758725308.m4s"
if let Some(Map { uri, .. }) = &first_segment.map {
header_url = Some(uri.clone());
}
}
// write all segments to clip_file
{
let playlist_folder = playlist_path.parent().unwrap();
let output_folder = output_path.parent().unwrap();
if !output_folder.exists() {
std::fs::create_dir_all(output_folder).unwrap();
}
let mut file = tokio::fs::File::create(&output_path).await.unwrap();
if let Some(header_url) = header_url {
let header_data = tokio::fs::read(playlist_folder.join(header_url))
.await
.unwrap();
file.write_all(&header_data).await.unwrap();
}
for s in segments {
// read segment
let segment_file_path = playlist_folder.join(s.uri);
let segment_data = tokio::fs::read(&segment_file_path).await.unwrap();
// append segment data to clip_file
file.write_all(&segment_data).await.unwrap();
}
file.flush().await.unwrap();
}
// transcode copy to fix timestamp
{
let tmp_output_path = output_path.with_extension("tmp.mp4");
super::transcode(output_path, &tmp_output_path, true).await?;
// remove original file
let _ = tokio::fs::remove_file(output_path).await;
// rename tmp_output_path to output_path
let _ = tokio::fs::rename(tmp_output_path, output_path).await;
}
// trim for precised duration
if let Some(start_offset) = start_offset {
let tmp_output_path = output_path.with_extension("tmp.mp4");
super::trim_video(
output_path,
&tmp_output_path,
start_offset,
range.as_ref().unwrap().duration(),
)
.await?;
// remove original file
let _ = tokio::fs::remove_file(output_path).await;
// rename tmp_output_path to output_path
let _ = tokio::fs::rename(tmp_output_path, output_path).await;
}
Ok(())
}
pub async fn playlists_to_video(playlists: &[&Path], output_path: &Path) -> Result<(), String> {
let mut segments = Vec::new();
for (i, playlist) in playlists.iter().enumerate() {
let video_path = output_path.with_extension(format!("{}.mp4", i));
playlist_to_video(playlist, &video_path, None).await?;
segments.push(video_path);
}
super::general::concat_videos(&segments, output_path).await?;
// clean up segments
for segment in segments {
let _ = tokio::fs::remove_file(segment).await;
}
Ok(())
}

View File

@@ -46,9 +46,11 @@ pub trait RecorderTrait<T>: RecorderBasicTrait<T> {
self.quit().store(true, atomic::Ordering::Relaxed);
if let Some(danmu_task) = self.danmu_task().lock().await.take() {
danmu_task.abort();
let _ = danmu_task.await;
}
if let Some(record_task) = self.record_task().lock().await.take() {
record_task.abort();
let _ = record_task.await;
}
}
async fn should_record(&self) -> bool {

View File

@@ -3,10 +3,7 @@ use std::path::Path;
use m3u8_rs::Map;
use tokio::io::AsyncWriteExt;
use crate::{
ffmpeg::{ffmpeg_path, general::handle_ffmpeg_process},
progress::progress_reporter::ProgressReporterTrait,
};
use crate::progress::progress_reporter::ProgressReporterTrait;
#[cfg(target_os = "windows")]
use crate::ffmpeg::CREATE_NO_WINDOW;
@@ -16,48 +13,6 @@ use std::os::windows::process::CommandExt;
use super::Range;
pub async fn cache_playlist(
reporter: Option<&impl ProgressReporterTrait>,
playlist_url: &str,
work_dir: &Path,
) -> Result<(), String> {
// ffmpeg -i "http://example.com/live/stream.m3u8" \
// -timeout 10 \
// -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 10 \
// -hls_list_size 0 \
// -hls_time 10 \
// -hls_flags append_list+program_date_time \
// -hls_segment_filename "cache_%Y%m%d_%H%M%S_%03d.ts" \
// -strftime 1 \
// -c copy \
// playlist.m3u8
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
// set work_dir
ffmpeg_process.current_dir(work_dir);
ffmpeg_process
.args(["-reconnect", "0"])
.args(["-reconnect_streamed", "0"])
.args(["-rw_timeout", "5000000"])
.args(["-i", playlist_url])
.args(["-hls_list_size", "0"])
.args(["-hls_time", "5"])
.args(["-hls_flags", "append_list+program_date_time"])
.args(["-hls_segment_filename", "%Y%m%d_%H%M%S.ts"])
.args(["-strftime", "1"])
.args(["-c", "copy"])
.args(["-progress", "pipe:2"])
.args(["-y", "playlist.m3u8"]);
handle_ffmpeg_process(reporter, &mut ffmpeg_process).await?;
Ok(())
}
pub async fn playlist_to_video(
reporter: Option<&impl ProgressReporterTrait>,
playlist_path: &Path,

View File

@@ -29,6 +29,8 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
#[cfg(feature = "gui")]
use tauri_plugin_notification::NotificationExt;
use thiserror::Error;
use tokio::fs::{remove_file, write, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
@@ -205,9 +207,24 @@ impl RecorderManager {
while let Ok(event) = rx.recv().await {
match event {
RecorderEvent::LiveStart { recorder } => {
let event =
events::new_webhook_event(events::LIVE_STARTED, Payload::Room(recorder));
let event = events::new_webhook_event(
events::LIVE_STARTED,
Payload::Room(recorder.clone()),
);
let _ = self.webhook_poster.post_event(&event).await;
if self.config.read().await.live_start_notify {
#[cfg(feature = "gui")]
self.app_handle
.notification()
.builder()
.title("BiliShadowReplay - 直播开始")
.body(format!(
"{} 开启了直播:{}",
recorder.user_info.user_name, recorder.room_info.room_title
))
.show()
.unwrap();
}
}
RecorderEvent::LiveEnd {
platform,
@@ -220,6 +237,19 @@ impl RecorderManager {
);
let _ = self.webhook_poster.post_event(&event).await;
self.handle_live_end(platform, room_id, &recorder).await;
if self.config.read().await.live_end_notify {
#[cfg(feature = "gui")]
self.app_handle
.notification()
.builder()
.title("BiliShadowReplay - 直播结束")
.body(format!(
"{} 结束了直播:{}",
recorder.user_info.user_name, recorder.room_info.room_title
))
.show()
.unwrap();
}
}
RecorderEvent::RecordStart { recorder } => {
// add record entry into db