Compare commits

...

11 Commits

Author SHA1 Message Date
Xinrea
f20636a107 bump version to 2.13.6 2025-09-28 00:12:26 +08:00
Xinrea
787a30e6f7 fix: clip on fmp4 -> ts archive 2025-09-28 00:10:31 +08:00
Xinrea
d1d217be18 bump version to 2.13.5 2025-09-27 15:07:53 +08:00
Xinrea
944d0a371a fix: danmu from ws 2025-09-27 15:06:53 +08:00
Xinrea
0df03e0c9c fix: create proxy master playlist for old fmp4 playlist 2025-09-27 14:54:13 +08:00
Xinrea
7ffdf65705 Revert "feat: convert old fmp4 archives into ts"
This reverts commit 89cdf91a48.
2025-09-27 01:00:39 +08:00
Xinrea
89cdf91a48 feat: convert old fmp4 archives into ts 2025-09-27 00:25:57 +08:00
Xinrea
43ebc27044 ci/cd: update docker script 2025-09-26 22:10:44 +08:00
Xinrea
e6159555f3 feat: migrate sse to websocket 2025-09-26 21:30:55 +08:00
Xinrea
1f2508aae9 fix: disable http2 for http server 2025-09-26 19:52:57 +08:00
Xinrea
ad13f58fa7 ci/cd: update final image 2025-09-26 19:19:24 +08:00
18 changed files with 857 additions and 429 deletions

View File

@@ -48,15 +48,9 @@ COPY src-tauri/crates ./src-tauri/crates
WORKDIR /app/src-tauri
RUN rustup component add rustfmt
RUN cargo build --no-default-features --features headless --release
# Download and install FFmpeg static build
RUN wget https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz \
&& tar xf ffmpeg-release-amd64-static.tar.xz \
&& mv ffmpeg-*-static/ffmpeg ./ \
&& mv ffmpeg-*-static/ffprobe ./ \
&& rm -rf ffmpeg-*-static ffmpeg-release-amd64-static.tar.xz
# Final stage
FROM debian:bookworm-slim AS final
FROM debian:trixie-slim AS final
WORKDIR /app
@@ -67,13 +61,13 @@ RUN apt-get update && apt-get install -y \
fonts-wqy-microhei \
netbase \
nscd \
ffmpeg \
&& update-ca-certificates \
&& rm -rf /var/lib/apt/lists/*
RUN touch /etc/netgroup
RUN mkdir -p /var/run/nscd && chmod 755 /var/run/nscd
RUN nscd
# Add /app to PATH
ENV PATH="/app:${PATH}"
@@ -83,8 +77,6 @@ COPY --from=frontend-builder /app/dist ./dist
# Copy built Rust binary
COPY --from=rust-builder /app/src-tauri/target/release/bili-shadowreplay .
COPY --from=rust-builder /app/src-tauri/ffmpeg ./ffmpeg
COPY --from=rust-builder /app/src-tauri/ffprobe ./ffprobe
# Expose port
EXPOSE 3000

View File

@@ -1,7 +1,7 @@
{
"name": "bili-shadowreplay",
"private": true,
"version": "2.13.4",
"version": "2.13.6",
"type": "module",
"scripts": {
"dev": "vite",
@@ -30,7 +30,8 @@
"@tauri-apps/plugin-sql": "~2",
"lucide-svelte": "^0.479.0",
"marked": "^16.1.1",
"qrcode": "^1.5.4"
"qrcode": "^1.5.4",
"socket.io-client": "^4.8.1"
},
"devDependencies": {
"@sveltejs/vite-plugin-svelte": "^2.0.0",

156
src-tauri/Cargo.lock generated
View File

@@ -124,6 +124,15 @@ version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "arbitrary"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1"
dependencies = [
"derive_arbitrary",
]
[[package]]
name = "ashpd"
version = "0.11.0"
@@ -442,7 +451,7 @@ dependencies = [
"hyper 1.6.0",
"hyper-util",
"itoa",
"matchit",
"matchit 0.7.3",
"memchr",
"mime",
"multer",
@@ -544,7 +553,7 @@ checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
[[package]]
name = "bili-shadowreplay"
version = "2.13.4"
version = "2.13.6"
dependencies = [
"async-ffmpeg-sidecar",
"async-std",
@@ -575,6 +584,7 @@ dependencies = [
"serde_derive",
"serde_json",
"simplelog",
"socketioxide",
"sqlx",
"srtparse",
"sysinfo",
@@ -592,6 +602,7 @@ dependencies = [
"tauri-utils",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
"tokio-util",
"toml 0.7.8",
"tower-http 0.5.2",
@@ -1313,7 +1324,7 @@ dependencies = [
"serde_json",
"thiserror 2.0.12",
"tokio",
"tokio-tungstenite",
"tokio-tungstenite 0.27.0",
"tonic-build",
"url",
"urlencoding",
@@ -1539,6 +1550,17 @@ dependencies = [
"serde",
]
[[package]]
name = "derive_arbitrary"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.104",
]
[[package]]
name = "derive_more"
version = "0.99.20"
@@ -1777,6 +1799,45 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3d8a32ae18130a3c84dd492d4215c3d913c3b07c6b63c2eb3eb7ff1101ab7bf"
[[package]]
name = "engineioxide"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88a4ef9fd57bc7e9fbe59550d3cba88536fbe47fba05ab33088edc4b09d3267a"
dependencies = [
"base64 0.22.1",
"bytes",
"engineioxide-core",
"futures-core",
"futures-util",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"serde",
"serde_json",
"smallvec",
"thiserror 2.0.12",
"tokio",
"tokio-tungstenite 0.26.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "engineioxide-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04e5d58eb7374df380cbb53ef65f9c35f544c9c217528adb1458c8df05978475"
dependencies = [
"base64 0.22.1",
"bytes",
"rand 0.9.1",
"serde",
]
[[package]]
name = "enumflags2"
version = "0.6.4"
@@ -3509,6 +3570,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f926ade0c4e170215ae43342bf13b9310a437609c81f29f86c5df6657582ef9"
[[package]]
name = "md-5"
version = "0.10.6"
@@ -5790,6 +5857,58 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "socketioxide"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "476190583b592f1e3d55584269600f2d3c4f18af36adad03c41c27e82dcb6bd5"
dependencies = [
"bytes",
"engineioxide",
"futures-core",
"futures-util",
"http 1.3.1",
"http-body 1.0.1",
"hyper 1.6.0",
"matchit 0.8.6",
"pin-project-lite",
"serde",
"socketioxide-core",
"socketioxide-parser-common",
"thiserror 2.0.12",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "socketioxide-core"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07b95089a961994921d23dd6e70792a06f5daa250b5ec8919f6f9de371d2cc5"
dependencies = [
"arbitrary",
"bytes",
"engineioxide-core",
"futures-core",
"serde",
"smallvec",
"thiserror 2.0.12",
]
[[package]]
name = "socketioxide-parser-common"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fe3b57122bf9c17fe8c2f364e1d307983068396cfb1b0407ec897de411f8033"
dependencies = [
"bytes",
"itoa",
"serde",
"serde_json",
"socketioxide-core",
]
[[package]]
name = "softbuffer"
version = "0.4.6"
@@ -7009,6 +7128,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.26.2",
]
[[package]]
name = "tokio-tungstenite"
version = "0.27.0"
@@ -7020,7 +7151,7 @@ dependencies = [
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite",
"tungstenite 0.27.0",
]
[[package]]
@@ -7296,6 +7427,23 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13"
dependencies = [
"bytes",
"data-encoding",
"http 1.3.1",
"httparse",
"log",
"rand 0.9.1",
"sha1",
"thiserror 2.0.12",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.27.0"

View File

@@ -4,7 +4,7 @@ resolver = "2"
[package]
name = "bili-shadowreplay"
version = "2.13.4"
version = "2.13.6"
description = "BiliBili ShadowReplay"
authors = ["Xinrea"]
license = ""
@@ -55,12 +55,14 @@ tower-http = { version = "0.5", features = ["cors", "fs"] }
futures-core = "0.3"
futures = "0.3"
tokio-util = { version = "0.7", features = ["io"] }
tokio-stream = "0.1"
clap = { version = "4.5.37", features = ["derive"] }
url = "2.5.4"
srtparse = "0.2.0"
thiserror = "2"
deno_core = "0.355"
sanitize-filename = "0.6.0"
socketioxide = "0.17.2"
[features]
# this feature is used for production builds or when `devPath` points to the filesystem

View File

@@ -0,0 +1,122 @@
use std::{
path::{Path, PathBuf},
process::Stdio,
};
use async_ffmpeg_sidecar::{event::FfmpegEvent, log_parser::FfmpegLogParser};
use tokio::io::{AsyncWriteExt, BufReader};
use crate::progress::progress_reporter::ProgressReporterTrait;
use super::ffmpeg_path;
#[cfg(target_os = "windows")]
const CREATE_NO_WINDOW: u32 = 0x08000000;
#[cfg(target_os = "windows")]
#[allow(unused_imports)]
use std::os::windows::process::CommandExt;
/// Generate a random filename in hex
pub async fn random_filename() -> String {
format!("{:x}", rand::random::<u64>())
}
pub async fn handle_ffmpeg_process(
reporter: Option<&impl ProgressReporterTrait>,
ffmpeg_process: &mut tokio::process::Command,
) -> Result<(), String> {
let child = ffmpeg_process.stderr(Stdio::piped()).spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Progress(p) => {
if let Some(reporter) = reporter {
reporter.update(p.time.to_string().as_str());
}
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Error(e) => {
return Err(e.to_string());
}
_ => {}
}
}
if let Err(e) = child.wait().await {
return Err(e.to_string());
}
if let Some(reporter) = reporter {
reporter.update("合成完成");
}
Ok(())
}
pub async fn concat_videos(
reporter: Option<&impl ProgressReporterTrait>,
videos: &[PathBuf],
output_path: &Path,
) -> Result<(), String> {
// ffmpeg -i input1.mp4 -i input2.mp4 -i input3.mp4 -c copy output.mp4
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
let output_folder = output_path.parent().unwrap();
if !output_folder.exists() {
std::fs::create_dir_all(output_folder).unwrap();
}
let filelist_filename = format!("filelist_{}.txt", random_filename().await);
let mut filelist = tokio::fs::File::create(&output_folder.join(&filelist_filename))
.await
.unwrap();
for video in videos {
filelist
.write_all(format!("file '{}'\n", video.display()).as_bytes())
.await
.unwrap();
}
filelist.flush().await.unwrap();
// Convert &[PathBuf] to &[&Path] for check_videos
let video_refs: Vec<&Path> = videos.iter().map(|p| p.as_path()).collect();
let should_encode = !super::check_videos(&video_refs).await;
ffmpeg_process.args([
"-f",
"concat",
"-safe",
"0",
"-i",
output_folder.join(&filelist_filename).to_str().unwrap(),
]);
if should_encode {
ffmpeg_process.args(["-vf", "scale=1920:1080"]);
ffmpeg_process.args(["-r", "60"]);
ffmpeg_process.args(["-c", "libx264"]);
ffmpeg_process.args(["-c:a", "aac"]);
ffmpeg_process.args(["-b:v", "6000k"]);
ffmpeg_process.args(["-b:a", "128k"]);
ffmpeg_process.args(["-threads", "0"]);
} else {
ffmpeg_process.args(["-c", "copy"]);
}
ffmpeg_process.args([output_path.to_str().unwrap()]);
ffmpeg_process.args(["-progress", "pipe:2"]);
ffmpeg_process.args(["-y"]);
handle_ffmpeg_process(reporter, &mut ffmpeg_process).await?;
// clean up filelist
let _ = tokio::fs::remove_file(output_folder.join(&filelist_filename)).await;
Ok(())
}

View File

@@ -2,6 +2,9 @@ use std::fmt;
use std::path::{Path, PathBuf};
use std::process::Stdio;
pub mod general;
pub mod playlist;
use crate::constants;
use crate::progress::progress_reporter::{ProgressReporter, ProgressReporterTrait};
use crate::subtitle_generator::whisper_online;
@@ -45,67 +48,100 @@ impl Range {
pub fn duration(&self) -> f64 {
self.end - self.start
}
pub fn is_in(&self, v: f64) -> bool {
v >= self.start && v <= self.end
}
}
pub async fn clip_from_m3u8(
pub async fn transcode(
reporter: Option<&impl ProgressReporterTrait>,
is_fmp4: bool,
m3u8_index: &Path,
file: &Path,
output_path: &Path,
range: Option<&Range>,
fix_encoding: bool,
copy_codecs: bool,
) -> Result<(), String> {
// first check output folder exists
log::debug!("Clip: is_fmp4: {}", is_fmp4);
let output_folder = output_path.parent().unwrap();
if !output_folder.exists() {
log::warn!(
"Output folder does not exist, creating: {}",
output_folder.display()
);
std::fs::create_dir_all(output_folder).unwrap();
}
// ffmpeg -i fixed_\[30655190\]1742887114_0325084106_81.5.mp4 -c:v libx264 -c:a aac -b:v 6000k -b:a 64k -compression_level 0 -threads 0 output.mp3
log::info!("Transcode: {} copy: {}", file.display(), copy_codecs);
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
if is_fmp4 {
// using output seek for fmp4 stream
ffmpeg_process.args(["-i", &format!("{}", m3u8_index.display())]);
if let Some(range) = range {
ffmpeg_process
.args(["-ss", &range.start.to_string()])
.args(["-t", &range.duration().to_string()]);
}
ffmpeg_process.args(["-i", file.to_str().unwrap()]);
if copy_codecs {
ffmpeg_process.args(["-c:v", "copy"]).args(["-c:a", "copy"]);
} else {
// using input seek for ts stream
if let Some(range) = range {
ffmpeg_process
.args(["-ss", &range.start.to_string()])
.args(["-t", &range.duration().to_string()]);
}
ffmpeg_process.args(["-i", &format!("{}", m3u8_index.display())]);
}
if fix_encoding {
ffmpeg_process
.args(["-c:v", "libx264"])
.args(["-c:a", "copy"])
.args(["-b:v", "6000k"]);
} else {
ffmpeg_process.args(["-c", "copy"]);
.args(["-c:a", "aac"])
.args(["-b:v", "6000k"])
.args(["-b:a", "128k"])
.args(["-threads", "0"]);
}
let child = ffmpeg_process
.args(["-y", output_path.to_str().unwrap()])
.args(["-progress", "pipe:2"])
.args([output_path.to_str().unwrap()])
.args(["-y"])
.stderr(Stdio::piped())
.spawn();
if let Err(e) = child {
return Err(format!("Spawn ffmpeg process failed: {e}"));
return Err(e.to_string());
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Progress(p) => {
if reporter.is_none() {
continue;
}
reporter
.unwrap()
.update(format!("压制中:{}", p.time).as_str());
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Error(e) => {
log::error!("Transcode error: {e}");
return Err(e.to_string());
}
_ => {}
}
}
if let Err(e) = child.wait().await {
return Err(e.to_string());
}
Ok(())
}
pub async fn trim_video(
reporter: Option<&impl ProgressReporterTrait>,
file: &Path,
output_path: &Path,
start_time: f64,
duration: f64,
) -> Result<(), String> {
// ffmpeg -i fixed_\[30655190\]1742887114_0325084106_81.5.mp4 -ss 0 -t 10 output.mp4
log::info!("Trim video task start: {}", file.display());
let mut ffmpeg_process = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
ffmpeg_process.creation_flags(CREATE_NO_WINDOW);
ffmpeg_process.args(["-i", file.to_str().unwrap()]);
ffmpeg_process.args(["-ss", &start_time.to_string()]);
ffmpeg_process.args(["-t", &duration.to_string()]);
ffmpeg_process.args(["-c", "copy"]);
ffmpeg_process.args([output_path.to_str().unwrap()]);
ffmpeg_process.args(["-y"]);
ffmpeg_process.args(["-progress", "pipe:2"]);
ffmpeg_process.stderr(Stdio::piped());
let child = ffmpeg_process.spawn();
if let Err(e) = child {
return Err(e.to_string());
}
let mut child = child.unwrap();
@@ -113,46 +149,33 @@ pub async fn clip_from_m3u8(
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
let mut clip_error = None;
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Progress(p) => {
if reporter.is_none() {
continue;
}
log::debug!("Clip progress: {}", p.time);
reporter
.unwrap()
.update(format!("编码中:{}", p.time).as_str());
.update(format!("切片中:{}", p.time).as_str());
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Log(level, content) => {
// log error if content contains error
if content.contains("error") || level == LogLevel::Error {
log::error!("Clip error: {content}");
}
}
FfmpegEvent::Error(e) => {
log::error!("Clip error: {e}");
clip_error = Some(e.to_string());
log::error!("Trim video error: {e}");
return Err(e.to_string());
}
_ => {}
}
}
if let Err(e) = child.wait().await {
log::error!("Clip error: {e}");
log::error!("Trim video error: {e}");
return Err(e.to_string());
}
if let Some(error) = clip_error {
log::error!("Clip error: {error}");
Err(error)
} else {
log::info!("Clip task end: {}", output_path.display());
log::info!("Trim video task end: {}", output_path.display());
Ok(())
}
}
pub async fn extract_audio_chunks(file: &Path, format: &str) -> Result<PathBuf, String> {
// ffmpeg -i fixed_\[30655190\]1742887114_0325084106_81.5.mp4 -ar 16000 test.wav
@@ -1068,18 +1091,18 @@ pub async fn convert_video_format(
}
}
/// Check if all playlist have same encoding and resolution
pub async fn check_multiple_playlist(playlist_paths: Vec<String>) -> bool {
/// Check if all videos have same encoding and resolution
pub async fn check_videos(video_paths: &[&Path]) -> bool {
// check if all playlist paths exist
let mut video_codec = "".to_owned();
let mut audio_codec = "".to_owned();
let mut width = 0;
let mut height = 0;
for playlist_path in playlist_paths.iter() {
if !Path::new(playlist_path).exists() {
for video_path in video_paths.iter() {
if !Path::new(video_path).exists() {
continue;
}
let metadata = extract_video_metadata(Path::new(playlist_path)).await;
let metadata = extract_video_metadata(Path::new(video_path)).await;
if metadata.is_err() {
log::error!(
"Failed to extract video metadata: {}",
@@ -1091,7 +1114,7 @@ pub async fn check_multiple_playlist(playlist_paths: Vec<String>) -> bool {
// check video codec
if !video_codec.is_empty() && metadata.video_codec != video_codec {
log::error!("Playlist video codec does not match: {}", playlist_path);
log::error!("Video codec does not match: {}", video_path.display());
return false;
} else {
video_codec = metadata.video_codec;
@@ -1099,7 +1122,7 @@ pub async fn check_multiple_playlist(playlist_paths: Vec<String>) -> bool {
// check audio codec
if !audio_codec.is_empty() && metadata.audio_codec != audio_codec {
log::error!("Playlist audio codec does not match: {}", playlist_path);
log::error!("Audio codec does not match: {}", video_path.display());
return false;
} else {
audio_codec = metadata.audio_codec;
@@ -1107,7 +1130,7 @@ pub async fn check_multiple_playlist(playlist_paths: Vec<String>) -> bool {
// check width
if width > 0 && metadata.width != width {
log::error!("Playlist width does not match: {}", playlist_path);
log::error!("Video width does not match: {}", video_path.display());
return false;
} else {
width = metadata.width;
@@ -1115,7 +1138,7 @@ pub async fn check_multiple_playlist(playlist_paths: Vec<String>) -> bool {
// check height
if height > 0 && metadata.height != height {
log::error!("Playlist height does not match: {}", playlist_path);
log::error!("Video height does not match: {}", video_path.display());
return false;
} else {
height = metadata.height;
@@ -1125,112 +1148,6 @@ pub async fn check_multiple_playlist(playlist_paths: Vec<String>) -> bool {
true
}
pub async fn concat_multiple_playlist(
reporter: Option<&ProgressReporter>,
playlist_paths: Vec<String>,
output_path: &Path,
) -> Result<(), String> {
// ffmpeg -i input.m3u8 -vf "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black" output.mp4
let mut cmd = tokio::process::Command::new(ffmpeg_path());
#[cfg(target_os = "windows")]
cmd.creation_flags(CREATE_NO_WINDOW);
// create a tmp filelist for concat
let tmp_filelist_path = output_path.with_extension("txt");
{
let mut filelist = tokio::fs::File::create(&tmp_filelist_path)
.await
.map_err(|e| e.to_string())?;
for playlist_path in playlist_paths.iter() {
// write line in the format "file 'path/to/file.m3u8'"
// playlist_path might be a relative path, so we need to convert it to an absolute path
let playlist_path = Path::new(playlist_path).canonicalize().unwrap();
let line = format!("file '{}'\n", playlist_path.display());
filelist
.write_all(line.as_bytes())
.await
.map_err(|e| e.to_string())?;
}
// Ensure all data is written to disk before proceeding
filelist.flush().await.map_err(|e| e.to_string())?;
} // File is automatically closed here
let can_copy_codecs = check_multiple_playlist(playlist_paths.clone()).await;
cmd.args([
"-f",
"concat",
"-safe",
"0",
"-i",
tmp_filelist_path.to_str().unwrap(),
]);
if !can_copy_codecs {
log::info!("Can not copy codecs, will re-encode");
cmd.args(["-vf", "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2:black"])
.args(["-c:v", "libx264"])
.args(["-c:a", "aac"])
.args(["-b:v", "6000k"])
.args(["-avoid_negative_ts", "make_zero"]);
} else {
cmd.args(["-c:v", "copy"]);
cmd.args(["-c:a", "copy"]);
}
let child = cmd
.args(["-y", output_path.to_str().unwrap()])
.stderr(Stdio::piped())
.spawn();
if let Err(e) = child {
return Err(format!("启动ffmpeg进程失败: {e}"));
}
let mut child = child.unwrap();
let stderr = child.stderr.take().unwrap();
let reader = BufReader::new(stderr);
let mut parser = FfmpegLogParser::new(reader);
let mut clip_error = None;
while let Ok(event) = parser.parse_next_event().await {
match event {
FfmpegEvent::Progress(p) => {
log::debug!("Concat progress: {}", p.time);
if let Some(reporter) = reporter {
reporter.update(format!("生成中:{}", p.time).as_str());
}
}
FfmpegEvent::LogEOF => break,
FfmpegEvent::Log(level, content) => {
log::debug!("[{:?}]Concat log: {content}", level);
}
FfmpegEvent::Error(e) => {
log::error!("切片错误: {e}");
clip_error = Some(e.to_string());
}
_ => {}
}
}
if let Err(e) = child.wait().await {
return Err(e.to_string());
}
// Clean up temporary filelist file
if let Err(e) = tokio::fs::remove_file(&tmp_filelist_path).await {
log::warn!("Failed to remove temporary filelist: {}", e);
}
if let Some(error) = clip_error {
return Err(error);
}
log::info!("Concat task end: {}", output_path.display());
Ok(())
}
pub async fn convert_fmp4_to_ts_raw(
header_data: &[u8],
source_data: &[u8],

View File

@@ -0,0 +1,135 @@
use std::path::Path;
use m3u8_rs::Map;
use tokio::io::AsyncWriteExt;
use crate::progress::progress_reporter::ProgressReporterTrait;
use super::Range;
pub async fn playlist_to_video(
reporter: Option<&impl ProgressReporterTrait>,
playlist_path: &Path,
output_path: &Path,
range: Option<Range>,
) -> Result<(), String> {
let (_, playlist) = m3u8_rs::parse_media_playlist(
&tokio::fs::read(playlist_path)
.await
.map_err(|e| e.to_string())?,
)
.unwrap();
let mut start_offset = None;
let mut segments = Vec::new();
if let Some(range) = &range {
let mut duration = 0.0;
for s in playlist.segments.clone() {
if range.is_in(duration) || range.is_in(duration + s.duration as f64) {
segments.push(s.clone());
if start_offset.is_none() {
start_offset = Some(range.start - duration);
}
}
duration += s.duration as f64;
}
} else {
segments = playlist.segments.clone();
}
if segments.is_empty() {
return Err("No segments found".to_string());
}
let first_segment = playlist.segments.first().unwrap().clone();
let mut header_url = first_segment
.unknown_tags
.iter()
.find(|t| t.tag == "X-MAP")
.map(|t| {
let rest = t.rest.clone().unwrap();
rest.split('=').nth(1).unwrap().replace("\\\"", "")
});
if header_url.is_none() {
// map: Some(Map { uri: "h1758725308.m4s"
if let Some(Map { uri, .. }) = &first_segment.map {
header_url = Some(uri.clone());
}
}
// write all segments to clip_file
{
let playlist_folder = playlist_path.parent().unwrap();
let output_folder = output_path.parent().unwrap();
if !output_folder.exists() {
std::fs::create_dir_all(output_folder).unwrap();
}
let mut file = tokio::fs::File::create(&output_path).await.unwrap();
if let Some(header_url) = header_url {
let header_data = tokio::fs::read(playlist_folder.join(header_url))
.await
.unwrap();
file.write_all(&header_data).await.unwrap();
}
for s in segments {
// read segment
let segment_file_path = playlist_folder.join(s.uri);
let segment_data = tokio::fs::read(&segment_file_path).await.unwrap();
// append segment data to clip_file
file.write_all(&segment_data).await.unwrap();
}
file.flush().await.unwrap();
}
// transcode copy to fix timestamp
{
let tmp_output_path = output_path.with_extension("tmp.mp4");
super::transcode(reporter, output_path, &tmp_output_path, true).await?;
// remove original file
let _ = tokio::fs::remove_file(output_path).await;
// rename tmp_output_path to output_path
let _ = tokio::fs::rename(tmp_output_path, output_path).await;
}
// trim for precised duration
if let Some(start_offset) = start_offset {
let tmp_output_path = output_path.with_extension("tmp.mp4");
super::trim_video(
reporter,
output_path,
&tmp_output_path,
start_offset,
range.as_ref().unwrap().duration(),
)
.await?;
// remove original file
let _ = tokio::fs::remove_file(output_path).await;
// rename tmp_output_path to output_path
let _ = tokio::fs::rename(tmp_output_path, output_path).await;
}
Ok(())
}
pub async fn playlists_to_video(
reporter: Option<&impl ProgressReporterTrait>,
playlists: &[&Path],
output_path: &Path,
) -> Result<(), String> {
let mut segments = Vec::new();
for (i, playlist) in playlists.iter().enumerate() {
let video_path = output_path.with_extension(format!("{}.mp4", i));
playlist_to_video(reporter, playlist, &video_path, None).await?;
segments.push(video_path);
}
super::general::concat_videos(reporter, &segments, output_path).await?;
// clean up segments
for segment in segments {
let _ = tokio::fs::remove_file(segment).await;
}
Ok(())
}

View File

@@ -1,4 +1,7 @@
use std::fmt::{self, Display};
use std::{
fmt::{self, Display},
path::PathBuf,
};
use crate::{
config::Config,
@@ -35,7 +38,7 @@ use crate::{
},
AccountInfo,
},
progress::progress_manager::Event,
http_server::websocket,
recorder::{
bilibili::{
client::{QrInfo, QrStatus},
@@ -48,44 +51,18 @@ use crate::{
recorder_manager::{ClipRangeParams, RecorderList},
state::State,
};
use axum::extract::Query;
use axum::{
body::Body,
extract::{DefaultBodyLimit, Json, Multipart, Path},
http::{Request, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response, Sse},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Router,
};
use axum::{extract::Query, response::sse};
use futures::stream::{self, Stream};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tower_http::cors::{Any, CorsLayer};
use tower_http::services::ServeDir;
// Middleware to add keep-alive headers to all responses
async fn add_keep_alive_headers(request: Request<Body>, next: Next) -> Response {
let uri_path = request.uri().path().to_string();
let mut response = next.run(request).await;
// Skip keep-alive for streaming endpoints that might not work well with it
let should_skip_keepalive = uri_path.starts_with("/api/sse")
|| uri_path.starts_with("/hls/")
|| uri_path.contains(".m3u8")
|| uri_path.contains(".ts");
if !should_skip_keepalive {
// Add Connection: keep-alive header for regular HTTP responses
response.headers_mut().insert(
axum::http::header::CONNECTION,
axum::http::HeaderValue::from_static("keep-alive"),
);
}
response
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ApiResponse<T> {
@@ -1650,67 +1627,6 @@ async fn handler_hls(
Ok(response)
}
// 字符串转义工具函数
fn escape_sse_string(s: &str) -> String {
s.replace('\\', "\\\\")
.replace('\n', "\\n")
.replace('\r', "\\r")
.replace('"', "\\\"")
}
async fn handler_sse(
state: axum::extract::State<State>,
) -> Sse<impl Stream<Item = Result<sse::Event, axum::Error>>> {
let rx = state.progress_manager.subscribe();
let stream = stream::unfold(rx, move |mut rx| async move {
match rx.recv().await {
Ok(event) => {
let sse_event = match event {
Event::ProgressUpdate { id, content } => {
sse::Event::default().event("progress-update").data(format!(
r#"{{"id":"{}","content":"{}"}}"#,
id,
escape_sse_string(&content)
))
}
Event::ProgressFinished {
id,
success,
message,
} => sse::Event::default()
.event("progress-finished")
.data(format!(
r#"{{"id":"{}","success":{},"message":"{}"}}"#,
id,
success,
escape_sse_string(&message)
)),
Event::DanmuReceived { room, ts, content } => sse::Event::default()
.event(format!("danmu:{}", room))
.data(format!(
r#"{{"ts":"{}","content":"{}"}}"#,
ts,
escape_sse_string(&content)
)),
};
Some((Ok(sse_event), rx))
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// 跳过丢失的事件,继续处理
Some((Ok(sse::Event::default().event("keep-alive").data("")), rx))
}
}
});
Sse::new(stream).keep_alive(
sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(30))
.text("keep-alive"),
)
}
const MAX_BODY_SIZE: usize = 10 * 1024 * 1024 * 1024;
pub async fn start_api_server(state: State) {
@@ -1886,14 +1802,15 @@ pub async fn start_api_server(state: State) {
.route("/api/upload_file", post(handler_upload_file))
.route("/api/image/:video_id", get(handler_image_base64))
.route("/hls/*uri", get(handler_hls))
.route("/api/sse", get(handler_sse))
.nest_service("/output", ServeDir::new(output_path))
.nest_service("/cache", ServeDir::new(cache_path));
let websocket_layer = websocket::create_websocket_server(state.clone()).await;
let router = app
.layer(websocket_layer)
.layer(cors)
.layer(DefaultBodyLimit::max(MAX_BODY_SIZE))
.layer(middleware::from_fn(add_keep_alive_headers))
.with_state(state);
let addr = "0.0.0.0:3000";

View File

@@ -0,0 +1,2 @@
pub mod api_server;
pub mod websocket;

View File

@@ -0,0 +1,101 @@
use serde_json::{json, Value};
use socketioxide::{
extract::{Data, SocketRef},
layer::SocketIoLayer,
SocketIo,
};
use tokio::sync::broadcast;
use crate::progress::progress_manager::Event;
use crate::state::State;
pub async fn create_websocket_server(state: State) -> SocketIoLayer {
let (layer, io) = SocketIo::new_layer();
// Clone the state for the namespace handler
let state_clone = state.clone();
io.ns("/ws", move |socket: SocketRef| {
let state = state_clone.clone();
// Subscribe to progress events
let mut rx = state.progress_manager.subscribe();
// Spawn a task to handle progress events for this socket
let socket_clone = socket.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let (event_type, message) = match event {
Event::ProgressUpdate { id, content } => (
"progress",
json!({
"event": "progress-update",
"data": {
"id": id,
"content": content
}
}),
),
Event::ProgressFinished {
id,
success,
message,
} => (
"progress",
json!({
"event": "progress-finished",
"data": {
"id": id,
"success": success,
"message": message
}
}),
),
Event::DanmuReceived { room, ts, content } => (
"danmu",
json!({
"event": "danmu-received",
"data": {
"room": room,
"ts": ts,
"content": content
}
}),
),
};
if let Err(e) = socket_clone.emit(event_type, &message) {
log::warn!("Failed to emit progress event to WebSocket client: {}", e);
break;
}
}
Err(broadcast::error::RecvError::Closed) => {
log::info!("Progress channel closed, stopping WebSocket progress stream");
break;
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!("WebSocket client lagged, skipped {} events", skipped);
}
}
}
});
// Handle client messages
socket.on("message", |socket: SocketRef, Data::<Value>(data)| {
log::debug!("Received WebSocket message: {:?}", data);
// Echo back the message for testing
socket.emit("echo", &data).ok();
});
// Handle client disconnect
socket.on_disconnect(|socket: SocketRef| {
log::info!("WebSocket client disconnected: {}", socket.id);
});
log::info!("WebSocket client connected: {}", socket.id);
});
layer
}

View File

@@ -682,6 +682,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(v) => log::info!("Checked ffmpeg version: {v}"),
}
http_server::start_api_server(state).await;
http_server::api_server::start_api_server(state).await;
Ok(())
}

View File

@@ -729,12 +729,6 @@ impl BiliRecorder {
}
}
log::debug!(
"[{}]Segment buffers: {}",
self.room_id,
segment_buffers.len()
);
for buffer in segment_buffers {
if buffer.sequence <= latest_sequence {
continue;
@@ -1268,7 +1262,6 @@ impl super::Recorder for BiliRecorder {
let mut v: Vec<u8> = Vec::new();
playlist.write_to(&mut v).unwrap();
let m3u8_content: &str = std::str::from_utf8(&v).unwrap();
let is_fmp4 = m3u8_content.contains("#EXT-X-MAP:URI=");
tokio::fs::write(&m3u8_index_file_path, m3u8_content).await?;
log::info!(
"[{}]M3U8 index file generated: {}",
@@ -1277,13 +1270,11 @@ impl super::Recorder for BiliRecorder {
);
// generate a tmp clip file
let clip_file_path = format!("{}/{}", work_dir, "tmp.mp4");
if let Err(e) = crate::ffmpeg::clip_from_m3u8(
if let Err(e) = crate::ffmpeg::playlist::playlist_to_video(
None::<&crate::progress::progress_reporter::ProgressReporter>,
is_fmp4,
Path::new(&m3u8_index_file_path),
Path::new(&clip_file_path),
None,
false,
)
.await
{

View File

@@ -842,13 +842,11 @@ impl Recorder for DouyinRecorder {
tokio::fs::write(&m3u8_index_file_path, m3u8_content).await?;
// generate a tmp clip file
let clip_file_path = format!("{}/{}", work_dir, "tmp.mp4");
if let Err(e) = crate::ffmpeg::clip_from_m3u8(
if let Err(e) = crate::ffmpeg::playlist::playlist_to_video(
None::<&crate::progress::progress_reporter::ProgressReporter>,
false,
Path::new(&m3u8_index_file_path),
Path::new(&clip_file_path),
None,
false,
)
.await
{

View File

@@ -4,7 +4,7 @@ use crate::database::recorder::RecorderRow;
use crate::database::video::VideoRow;
use crate::database::{account::AccountRow, record::RecordRow};
use crate::database::{Database, DatabaseError};
use crate::ffmpeg::{clip_from_m3u8, encode_video_danmu, Range};
use crate::ffmpeg::{encode_video_danmu, transcode, Range};
use crate::progress::progress_reporter::{EventEmitter, ProgressReporter};
use crate::recorder::bilibili::{BiliRecorder, BiliRecorderOptions};
use crate::recorder::danmu::DanmuEntry;
@@ -431,61 +431,43 @@ impl RecorderManager {
clip_file: PathBuf,
params: &ClipRangeParams,
) -> Result<PathBuf, RecorderManagerError> {
let range_m3u8 = format!(
"{}/{}/{}/playlist.m3u8",
params.platform, params.room_id, params.live_id
);
let manifest_content = self.handle_hls_request(&range_m3u8).await?;
let mut manifest_content = String::from_utf8(manifest_content)
.map_err(|e| RecorderManagerError::ClipError { err: e.to_string() })?;
// if manifest is for stream, replace EXT-X-PLAYLIST-TYPE:EVENT to EXT-X-PLAYLIST-TYPE:VOD, and add #EXT-X-ENDLIST
if manifest_content.contains("#EXT-X-PLAYLIST-TYPE:EVENT") {
manifest_content =
manifest_content.replace("#EXT-X-PLAYLIST-TYPE:EVENT", "#EXT-X-PLAYLIST-TYPE:VOD");
manifest_content += "\n#EXT-X-ENDLIST\n";
}
let is_fmp4 = manifest_content.contains("#EXT-X-MAP:URI=");
let cache_path = self.config.read().await.cache.clone();
let cache_path = Path::new(&cache_path);
let random_filename = format!("manifest_{}.m3u8", uuid::Uuid::new_v4());
let tmp_manifest_file_path = cache_path
.join(&params.platform)
let playlist_path = cache_path
.join(params.platform.clone())
.join(params.room_id.to_string())
.join(&params.live_id)
.join(random_filename);
.join(params.live_id.clone())
.join("playlist.m3u8");
// Write manifest content to temporary file
tokio::fs::write(&tmp_manifest_file_path, manifest_content.as_bytes())
if !playlist_path.exists() {
log::error!("Playlist file not found: {}", playlist_path.display());
return Err(RecorderManagerError::ClipError {
err: "Playlist file not found".to_string(),
});
}
crate::ffmpeg::playlist::playlist_to_video(
reporter,
&playlist_path,
&clip_file,
params.range.clone(),
)
.await
.map_err(|e| RecorderManagerError::ClipError { err: e.to_string() })?;
if let Err(e) = clip_from_m3u8(
reporter,
is_fmp4,
&tmp_manifest_file_path,
&clip_file,
params.range.as_ref(),
params.fix_encoding,
)
.await
{
log::error!("Failed to generate clip file: {e}");
if params.fix_encoding {
// transcode clip_file
let tmp_clip_file = clip_file.with_extension("tmp.mp4");
if let Err(e) = transcode(reporter, &clip_file, &tmp_clip_file, false).await {
log::error!("Failed to transcode clip file: {e}");
return Err(RecorderManagerError::ClipError { err: e.to_string() });
}
// remove temp file
let _ = tokio::fs::remove_file(tmp_manifest_file_path).await;
// remove clip_file
let _ = tokio::fs::remove_file(&clip_file).await;
// check clip_file exists
if !clip_file.exists() {
log::error!("Clip file not found: {}", clip_file.display());
return Err(RecorderManagerError::ClipError {
err: "Clip file not found".into(),
});
// rename tmp_clip_file to clip_file
let _ = tokio::fs::rename(tmp_clip_file, &clip_file).await;
}
if !params.danmu {
@@ -890,8 +872,16 @@ impl RecorderManager {
log::info!("Concat playlists: {playlists:?}");
log::info!("Output path: {output_path:?}");
let owned_path_bufs: Vec<std::path::PathBuf> =
playlists.iter().map(std::path::PathBuf::from).collect();
let playlists_refs: Vec<&std::path::Path> = owned_path_bufs
.iter()
.map(std::path::PathBuf::as_path)
.collect();
if let Err(e) =
crate::ffmpeg::concat_multiple_playlist(reporter, playlists, Path::new(&output_path))
crate::ffmpeg::playlist::playlists_to_video(reporter, &playlists_refs, &output_path)
.await
{
log::error!("Failed to concat playlists: {e}");

View File

@@ -1,11 +1,5 @@
<script lang="ts">
import {
invoke,
TAURI_ENV,
ENDPOINT,
listen,
onConnectionRestore,
} from "../invoker";
import { invoke, TAURI_ENV, ENDPOINT, listen } from "../invoker";
import { Upload, X, CheckCircle } from "lucide-svelte";
import { createEventDispatcher, onDestroy } from "svelte";
import { open } from "@tauri-apps/plugin-dialog";
@@ -120,11 +114,6 @@
}
}
// 注册连接恢复回调
if (!TAURI_ENV) {
onConnectionRestore(checkTaskStatus);
}
onDestroy(() => {
progressUpdateListener?.then((fn) => fn());
progressFinishedListener?.then((fn) => fn());

View File

@@ -52,25 +52,42 @@
console.log("Saved start and end", start + focus_start, end + focus_start);
}
async function loadGlobalOffset(url: string) {
async function load_metadata(url: string) {
let offset = 0;
let is_fmp4 = false;
const response = await fetch(url);
const m3u8Content = await response.text();
// extract offset from m3u8
const firstSegmentDatetime = m3u8Content
.split("\n")
.find((line) => line.startsWith("#EXT-X-PROGRAM-DATE-TIME:"));
if (firstSegmentDatetime) {
if (global_offset == 0) {
const date_str = firstSegmentDatetime.replace(
"#EXT-X-PROGRAM-DATE-TIME:",
""
);
global_offset = new Date(date_str).getTime() / 1000;
}
offset = new Date(date_str).getTime() / 1000;
} else {
if (global_offset == 0) {
global_offset = parseInt(live_id) / 1000;
offset = parseInt(live_id) / 1000;
}
// check if fmp4 live
if (m3u8Content.includes("#EXT-X-MAP:URI=")) {
is_fmp4 = true;
}
return {
offset,
is_fmp4,
};
}
function createMasterPlaylist(mediaPlaylistUrl: string) {
return `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=10000000,CODECS="avc1.64002a,mp4a.40.2"
${mediaPlaylistUrl}`;
}
function tauriNetworkPlugin(uri, requestType, progressUpdated) {
@@ -228,11 +245,24 @@
});
try {
const url = `${ENDPOINT ? ENDPOINT : window.location.origin}/hls/${platform}/${room_id}/${live_id}/playlist.m3u8?start=${focus_start}&end=${focus_end}`;
let direct_url = `${ENDPOINT ? ENDPOINT : window.location.origin}/hls/${platform}/${room_id}/${live_id}/playlist.m3u8?start=${focus_start}&end=${focus_end}`;
if (!TAURI_ENV) {
await loadGlobalOffset(url);
const { offset, is_fmp4 } = await load_metadata(direct_url);
global_offset = offset;
if (is_fmp4) {
let master_url = createMasterPlaylist(direct_url);
let blob = new Blob([master_url], {
type: "application/vnd.apple.mpegurl",
});
master_url = URL.createObjectURL(blob);
await player.load(master_url);
} else {
await player.load(direct_url);
}
await player.load(url);
} else {
await player.load(direct_url);
}
// This runs if the asynchronous load is successful.
console.log("The video has now been loaded!");
} catch (error) {
@@ -383,11 +413,15 @@
}
// listen to danmaku event
await listen("danmu:" + room_id, (event: { payload: DanmuEntry }) => {
await listen("danmu", (event: { payload: DanmuEntry }) => {
if (global_offset == 0) {
return;
}
if (event.payload.room != room_id) {
return;
}
if (event.payload.ts < global_offset * 1000) {
log.error("invalid danmu ts:", event.payload.ts, global_offset);
return;

View File

@@ -5,6 +5,7 @@ import { convertFileSrc as tauri_convert } from "@tauri-apps/api/core";
import { listen as tauri_listen } from "@tauri-apps/api/event";
import { open as tauri_open } from "@tauri-apps/plugin-shell";
import { onOpenUrl as tauri_onOpenUrl } from "@tauri-apps/plugin-deep-link";
import { io, Socket } from "socket.io-client";
declare global {
interface Window {
@@ -171,60 +172,80 @@ async function get_cover(coverType: string, coverPath: string) {
return `${ENDPOINT}/${coverType}/${coverPath}`;
}
let event_source: EventSource | null = null;
let reconnectTimeout: number | null = null;
const MAX_RECONNECT_ATTEMPTS = 5;
let reconnectAttempts = 0;
let socket: Socket | null = null;
// 连接恢复回调列表
const connectionRestoreCallbacks: Array<() => void> = [];
// Socket.IO 事件监听器映射
const eventListeners: Map<string, Array<(data: any) => void>> = new Map();
function createEventSource() {
if (TAURI_ENV) return;
if (event_source) {
event_source.close();
function createSocket() {
if (socket) {
socket.disconnect();
}
event_source = new EventSource(`${ENDPOINT}/api/sse`);
event_source.onopen = () => {
reconnectAttempts = 0;
// 构建 Socket.IO URL
console.log("endpoint:", ENDPOINT);
const socketUrl = ENDPOINT;
socket = io(`${socketUrl}/ws`, {
transports: ["websocket", "polling"],
autoConnect: true,
reconnection: true,
});
// 触发连接恢复回调
connectionRestoreCallbacks.forEach((callback) => {
socket.on("connect", () => {
console.log("[Socket.IO] Connected to server");
});
socket.on("disconnect", (reason) => {
console.log("[Socket.IO] Disconnected from server:", reason);
});
socket.on("connect_error", (error) => {
console.error("[Socket.IO] Connection error:", error);
});
// 监听服务器发送的事件
socket.on("progress", (data) => {
const eventType = data.event || "message";
// 触发对应的事件监听器
const listeners = eventListeners.get(eventType);
if (listeners) {
listeners.forEach((callback) => {
try {
callback();
callback({
type: eventType,
payload: data.data,
});
} catch (e) {
console.error("[SSE] Connection restore callback error:", e);
console.error(
`[Socket.IO] Event listener error for ${eventType}:`,
e
);
}
});
};
event_source.onerror = (error) => {
// 只有在连接真正关闭时才进行重连
if (
event_source.readyState === EventSource.CLOSED &&
reconnectAttempts < MAX_RECONNECT_ATTEMPTS
) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 10000);
reconnectTimeout = window.setTimeout(() => {
createEventSource();
}, delay);
} else {
console.error("[SSE] Max reconnection attempts reached, giving up");
}
};
}
});
// 注册连接恢复回调
function onConnectionRestore(callback: () => void) {
connectionRestoreCallbacks.push(callback);
socket.on("danmu", (data) => {
// 触发对应的事件监听器
const listeners = eventListeners.get("danmu");
if (listeners) {
listeners.forEach((callback) => {
try {
callback({
type: "danmu",
payload: data.data,
});
} catch (e) {
console.error(`[Socket.IO] Event listener error for danmu:`, e);
}
});
}
});
}
if (!TAURI_ENV) {
createEventSource();
createSocket();
}
async function listen<T>(event: string, callback: (data: any) => void) {
@@ -232,13 +253,26 @@ async function listen<T>(event: string, callback: (data: any) => void) {
return await tauri_listen(event, callback);
}
event_source.addEventListener(event, (event_data) => {
const data = JSON.parse(event_data.data);
callback({
type: event,
payload: data,
});
});
// 将事件监听器添加到映射中
if (!eventListeners.has(event)) {
eventListeners.set(event, []);
}
eventListeners.get(event)!.push(callback);
// 返回一个清理函数
return () => {
const listeners = eventListeners.get(event);
if (listeners) {
const index = listeners.indexOf(callback);
if (index > -1) {
listeners.splice(index, 1);
}
// 如果没有监听器了,删除这个事件
if (listeners.length === 0) {
eventListeners.delete(event);
}
}
};
}
async function open(url: string) {
@@ -273,6 +307,5 @@ export {
log,
close_window,
onOpenUrl,
onConnectionRestore,
get_cover,
};

View File

@@ -886,6 +886,11 @@
resolved "https://registry.yarnpkg.com/@shikijs/vscode-textmate/-/vscode-textmate-10.0.2.tgz#a90ab31d0cc1dfb54c66a69e515bf624fa7b2224"
integrity sha512-83yeghZ2xxin3Nj8z1NMd/NCuca+gsYXswywDy5bHvwlWL8tpTQmzGeUuHd9FC3E/SBEMvzJRwWEOz5gGes9Qg==
"@socket.io/component-emitter@~3.1.0":
version "3.1.2"
resolved "https://registry.yarnpkg.com/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz#821f8442f4175d8f0467b9daf26e3a18e2d02af2"
integrity sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==
"@sveltejs/vite-plugin-svelte-inspector@^1.0.4":
version "1.0.4"
resolved "https://registry.yarnpkg.com/@sveltejs/vite-plugin-svelte-inspector/-/vite-plugin-svelte-inspector-1.0.4.tgz#c99fcb73aaa845a3e2c0563409aeb3ee0b863add"
@@ -2193,6 +2198,13 @@ debug@^4.3.4, debug@^4.4.0:
dependencies:
ms "^2.1.3"
debug@~4.3.1, debug@~4.3.2:
version "4.3.7"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.7.tgz#87945b4151a011d76d95a198d7111c865c360a52"
integrity sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==
dependencies:
ms "^2.1.3"
decamelize@1.2.0, decamelize@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/decamelize/-/decamelize-1.2.0.tgz#f6534d15148269b20352e7bee26f501f9a191290"
@@ -2279,6 +2291,22 @@ emoji-regex@^9.2.2:
resolved "https://registry.yarnpkg.com/emoji-regex/-/emoji-regex-9.2.2.tgz#840c8803b0d8047f4ff0cf963176b32d4ef3ed72"
integrity sha512-L18DaJsXSUk2+42pv8mLs5jJT2hqFkFE4j21wOmgbUqsZ2hL72NsUU785g9RXgo3s0ZNgVl42TiHp3ZtOv/Vyg==
engine.io-client@~6.6.1:
version "6.6.3"
resolved "https://registry.yarnpkg.com/engine.io-client/-/engine.io-client-6.6.3.tgz#815393fa24f30b8e6afa8f77ccca2f28146be6de"
integrity sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==
dependencies:
"@socket.io/component-emitter" "~3.1.0"
debug "~4.3.1"
engine.io-parser "~5.2.1"
ws "~8.17.1"
xmlhttprequest-ssl "~2.1.1"
engine.io-parser@~5.2.1:
version "5.2.3"
resolved "https://registry.yarnpkg.com/engine.io-parser/-/engine.io-parser-5.2.3.tgz#00dc5b97b1f233a23c9398d0209504cf5f94d92f"
integrity sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==
entities@^4.5.0:
version "4.5.0"
resolved "https://registry.yarnpkg.com/entities/-/entities-4.5.0.tgz#5d268ea5e7113ec74c4d033b79ea5a35a488fb48"
@@ -3478,6 +3506,24 @@ simple-wcswidth@^1.0.1:
resolved "https://registry.yarnpkg.com/simple-wcswidth/-/simple-wcswidth-1.1.2.tgz#66722f37629d5203f9b47c5477b1225b85d6525b"
integrity sha512-j7piyCjAeTDSjzTSQ7DokZtMNwNlEAyxqSZeCS+CXH7fJ4jx3FuJ/mTW3mE+6JLs4VJBbcll0Kjn+KXI5t21Iw==
socket.io-client@^4.8.1:
version "4.8.1"
resolved "https://registry.yarnpkg.com/socket.io-client/-/socket.io-client-4.8.1.tgz#1941eca135a5490b94281d0323fe2a35f6f291cb"
integrity sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==
dependencies:
"@socket.io/component-emitter" "~3.1.0"
debug "~4.3.2"
engine.io-client "~6.6.1"
socket.io-parser "~4.2.4"
socket.io-parser@~4.2.4:
version "4.2.4"
resolved "https://registry.yarnpkg.com/socket.io-parser/-/socket.io-parser-4.2.4.tgz#c806966cf7270601e47469ddeec30fbdfda44c83"
integrity sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==
dependencies:
"@socket.io/component-emitter" "~3.1.0"
debug "~4.3.1"
sorcery@^0.11.0:
version "0.11.1"
resolved "https://registry.yarnpkg.com/sorcery/-/sorcery-0.11.1.tgz#7cac27ae9c9549b3cd1e4bb85317f7b2dc7b7e22"
@@ -4059,6 +4105,16 @@ wrappy@1:
resolved "https://registry.yarnpkg.com/wrappy/-/wrappy-1.0.2.tgz#b5243d8f3ec1aa35f1364605bc0d1036e30ab69f"
integrity sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==
ws@~8.17.1:
version "8.17.1"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.1.tgz#9293da530bb548febc95371d90f9c878727d919b"
integrity sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==
xmlhttprequest-ssl@~2.1.1:
version "2.1.2"
resolved "https://registry.yarnpkg.com/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz#e9e8023b3f29ef34b97a859f584c5e6c61418e23"
integrity sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==
y18n@^4.0.0:
version "4.0.3"
resolved "https://registry.yarnpkg.com/y18n/-/y18n-4.0.3.tgz#b5f259c82cd6e336921efd7bfd8bf560de9eeedf"