mirror of
https://github.com/Xinrea/bili-shadowreplay.git
synced 2025-11-24 20:15:34 +08:00
fix: bilibili stream pathway not update (close #117)
This commit is contained in:
@@ -117,6 +117,9 @@ async fn setup_logging(log_dir: &Path) -> Result<(), Box<dyn std::error::Error>>
|
||||
),
|
||||
])?;
|
||||
|
||||
// logging current package version
|
||||
log::info!("Current version: {}", env!("CARGO_PKG_VERSION"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::subtitle_generator::item_to_srt;
|
||||
|
||||
use super::danmu::{DanmuEntry, DanmuStorage};
|
||||
use super::entry::TsEntry;
|
||||
use std::path::Path;
|
||||
use chrono::Utc;
|
||||
use client::{BiliClient, BiliStream, RoomInfo, StreamType, UserInfo};
|
||||
use danmu_stream::danmu_stream::DanmuStream;
|
||||
@@ -22,11 +21,12 @@ use danmu_stream::DanmuMessageType;
|
||||
use errors::BiliClientError;
|
||||
use m3u8_rs::{Playlist, QuotedOrUnquoted, VariantStream};
|
||||
use regex::Regex;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::sync::{broadcast, Mutex, RwLock};
|
||||
use tokio::task::JoinHandle;
|
||||
use url::Url;
|
||||
@@ -70,9 +70,10 @@ pub struct BiliRecorder {
|
||||
enabled: Arc<RwLock<bool>>,
|
||||
last_segment_offset: Arc<RwLock<Option<i64>>>, // 保存上次处理的最后一个片段的偏移
|
||||
current_header_url: Arc<RwLock<Option<String>>>, // 保存当前的 header URL
|
||||
header_changed_recently: Arc<AtomicBool>, // 标记最近是否发生了 header 变化
|
||||
header_changed_recently: Arc<AtomicBool>, // 标记最近是否发生了 header 变化
|
||||
danmu_task: Arc<Mutex<Option<JoinHandle<()>>>>,
|
||||
record_task: Arc<Mutex<Option<JoinHandle<()>>>>,
|
||||
master_manifest: Arc<RwLock<Option<String>>>,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for super::errors::RecorderError {
|
||||
@@ -147,6 +148,7 @@ impl BiliRecorder {
|
||||
header_changed_recently: Arc::new(AtomicBool::new(false)),
|
||||
danmu_task: Arc::new(Mutex::new(None)),
|
||||
record_task: Arc::new(Mutex::new(None)),
|
||||
master_manifest: Arc::new(RwLock::new(None)),
|
||||
};
|
||||
log::info!("Recorder for room {} created.", options.room_id);
|
||||
Ok(recorder)
|
||||
@@ -161,7 +163,6 @@ impl BiliRecorder {
|
||||
*self.last_segment_offset.write().await = None;
|
||||
*self.current_header_url.write().await = None;
|
||||
self.header_changed_recently.store(false, Ordering::Relaxed);
|
||||
|
||||
}
|
||||
|
||||
async fn should_record(&self) -> bool {
|
||||
@@ -267,11 +268,13 @@ impl BiliRecorder {
|
||||
return true;
|
||||
}
|
||||
|
||||
let master_manifest =
|
||||
m3u8_rs::parse_playlist_res(master_manifest.as_ref().unwrap().as_bytes())
|
||||
.map_err(|_| super::errors::RecorderError::M3u8ParseFailed {
|
||||
content: master_manifest.as_ref().unwrap().clone(),
|
||||
});
|
||||
let master_manifest = master_manifest.unwrap();
|
||||
*self.master_manifest.write().await = Some(master_manifest.clone());
|
||||
|
||||
let master_manifest = m3u8_rs::parse_playlist_res(master_manifest.as_bytes())
|
||||
.map_err(|_| super::errors::RecorderError::M3u8ParseFailed {
|
||||
content: master_manifest.clone(),
|
||||
});
|
||||
if master_manifest.is_err() {
|
||||
log::error!(
|
||||
"[{}]Parse master manifest failed: {}",
|
||||
@@ -312,6 +315,8 @@ impl BiliRecorder {
|
||||
|
||||
let variant = variant.unwrap();
|
||||
|
||||
log::info!("Variant: {:?}", variant);
|
||||
|
||||
let new_stream = self.stream_from_variant(variant).await;
|
||||
if new_stream.is_err() {
|
||||
log::error!(
|
||||
@@ -325,26 +330,13 @@ impl BiliRecorder {
|
||||
let stream = new_stream.unwrap();
|
||||
|
||||
let should_update_stream = self.live_stream.read().await.is_none()
|
||||
|| !self
|
||||
.live_stream
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.is_same(&stream)
|
||||
|| self.force_update.load(Ordering::Relaxed);
|
||||
|| self.force_update.load(Ordering::Relaxed)
|
||||
|| self.header_changed_recently.load(Ordering::Relaxed);
|
||||
|
||||
if should_update_stream {
|
||||
log::info!(
|
||||
"[{}]Update to a new stream: {:?} => {}",
|
||||
self.room_id,
|
||||
self.live_stream.read().await.clone(),
|
||||
stream
|
||||
);
|
||||
|
||||
self.force_update.store(false, Ordering::Relaxed);
|
||||
|
||||
let new_stream = self.fetch_real_stream(stream).await;
|
||||
let new_stream = self.fetch_real_stream(&stream).await;
|
||||
if new_stream.is_err() {
|
||||
log::error!(
|
||||
"[{}]Fetch real stream failed: {}",
|
||||
@@ -357,6 +349,13 @@ impl BiliRecorder {
|
||||
let new_stream = new_stream.unwrap();
|
||||
*self.live_stream.write().await = Some(new_stream);
|
||||
*self.last_update.write().await = Utc::now().timestamp();
|
||||
|
||||
log::info!(
|
||||
"[{}]Update to a new stream: {:?} => {}",
|
||||
self.room_id,
|
||||
self.live_stream.read().await.clone(),
|
||||
stream
|
||||
);
|
||||
}
|
||||
|
||||
true
|
||||
@@ -463,6 +462,10 @@ impl BiliRecorder {
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed fetching index content from {}", stream.index());
|
||||
log::error!(
|
||||
"Master manifest: {}",
|
||||
self.master_manifest.read().await.as_ref().unwrap()
|
||||
);
|
||||
Err(super::errors::RecorderError::BiliClientError { err: e })
|
||||
}
|
||||
}
|
||||
@@ -474,7 +477,7 @@ impl BiliRecorder {
|
||||
return Err(super::errors::RecorderError::NoStreamAvailable);
|
||||
}
|
||||
let stream = stream.unwrap();
|
||||
|
||||
|
||||
let index_content = self
|
||||
.client
|
||||
.read()
|
||||
@@ -489,7 +492,7 @@ impl BiliRecorder {
|
||||
url: stream.index(),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
let mut header_url = String::from("");
|
||||
let re = Regex::new(r"h.*\.m4s").unwrap();
|
||||
if let Some(captures) = re.captures(&index_content) {
|
||||
@@ -498,13 +501,13 @@ impl BiliRecorder {
|
||||
if header_url.is_empty() {
|
||||
log::warn!("Parse header url failed: {}", index_content);
|
||||
}
|
||||
|
||||
|
||||
Ok(header_url)
|
||||
}
|
||||
|
||||
async fn fetch_real_stream(
|
||||
&self,
|
||||
stream: BiliStream,
|
||||
stream: &BiliStream,
|
||||
) -> Result<BiliStream, super::errors::RecorderError> {
|
||||
let index_content = self
|
||||
.client
|
||||
@@ -513,7 +516,9 @@ impl BiliRecorder {
|
||||
.get_index_content(&self.account, &stream.index())
|
||||
.await?;
|
||||
if index_content.is_empty() {
|
||||
return Err(super::errors::RecorderError::InvalidStream { stream });
|
||||
return Err(super::errors::RecorderError::InvalidStream {
|
||||
stream: stream.clone(),
|
||||
});
|
||||
}
|
||||
if index_content.contains("Not Found") {
|
||||
return Err(super::errors::RecorderError::IndexNotFound {
|
||||
@@ -524,7 +529,7 @@ impl BiliRecorder {
|
||||
// this index content provides another m3u8 url
|
||||
// example: https://765b047cec3b099771d4b1851136046f.v.smtcdns.net/d1--cn-gotcha204-3.bilivideo.com/live-bvc/246284/live_1323355750_55526594/index.m3u8?expires=1741318366&len=0&oi=1961017843&pt=h5&qn=10000&trid=1007049a5300422eeffd2d6995d67b67ca5a&sigparams=cdn,expires,len,oi,pt,qn,trid&cdn=cn-gotcha204&sign=7ef1241439467ef27d3c804c1eda8d4d&site=1c89ef99adec13fab3a3592ee4db26d3&free_type=0&mid=475210&sche=ban&bvchls=1&trace=16&isp=ct&rg=East&pv=Shanghai&source=puv3_onetier&p2p_type=-1&score=1&suffix=origin&deploy_env=prod&flvsk=e5c4d6fb512ed7832b706f0a92f7a8c8&sk=246b3930727a89629f17520b1b551a2f&pp=rtmp&hot_cdn=57345&origin_bitrate=657300&sl=1&info_source=cache&vd=bc&src=puv3&order=1&TxLiveCode=cold_stream&TxDispType=3&svr_type=live_oc&tencent_test_client_ip=116.226.193.243&dispatch_from=OC_MGR61.170.74.11&utime=1741314857497
|
||||
let new_url = index_content.lines().last().unwrap();
|
||||
|
||||
|
||||
// extract host: cn-gotcha204-3.bilivideo.com
|
||||
let host = new_url.split('/').nth(2).unwrap_or_default();
|
||||
let extra = new_url.split('?').nth(1).unwrap_or_default();
|
||||
@@ -536,11 +541,11 @@ impl BiliRecorder {
|
||||
.collect::<Vec<&str>>()
|
||||
.join("/")
|
||||
+ "/";
|
||||
|
||||
|
||||
let new_stream = BiliStream::new(StreamType::FMP4, base_url.as_str(), host, extra);
|
||||
return Box::pin(self.fetch_real_stream(new_stream)).await;
|
||||
return Box::pin(self.fetch_real_stream(&new_stream)).await;
|
||||
}
|
||||
Ok(stream)
|
||||
Ok(stream.clone())
|
||||
}
|
||||
|
||||
async fn get_work_dir(&self, live_id: &str) -> String {
|
||||
@@ -560,10 +565,17 @@ impl BiliRecorder {
|
||||
}
|
||||
let current_stream = current_stream.unwrap();
|
||||
let parsed = self.get_playlist().await;
|
||||
if parsed.is_err() {
|
||||
self.force_update.store(true, Ordering::Relaxed);
|
||||
return Err(parsed.err().unwrap());
|
||||
}
|
||||
|
||||
let playlist = parsed.unwrap();
|
||||
|
||||
let mut timestamp: i64 = self.live_id.read().await.parse::<i64>().unwrap_or(0);
|
||||
let mut work_dir;
|
||||
let mut is_first_record = false;
|
||||
|
||||
|
||||
// Check header for FMP4 streams
|
||||
if current_stream.format == StreamType::FMP4 {
|
||||
// Get url from EXT-X-MAP
|
||||
@@ -604,16 +616,13 @@ impl BiliRecorder {
|
||||
return Err(super::errors::RecorderError::HeaderChanged);
|
||||
}
|
||||
|
||||
// Save the new header URL
|
||||
*self.current_header_url.write().await = Some(header_url.clone());
|
||||
|
||||
timestamp = Utc::now().timestamp_millis();
|
||||
*self.live_id.write().await = timestamp.to_string();
|
||||
work_dir = self.get_work_dir(timestamp.to_string().as_str()).await;
|
||||
is_first_record = true;
|
||||
|
||||
let full_header_url = current_stream.ts_url(&header_url);
|
||||
|
||||
|
||||
let file_name = header_url.split('/').next_back().unwrap();
|
||||
let mut header = TsEntry {
|
||||
url: file_name.to_string(),
|
||||
@@ -623,69 +632,81 @@ impl BiliRecorder {
|
||||
ts: timestamp,
|
||||
is_header: true,
|
||||
};
|
||||
|
||||
// Create work directory before download
|
||||
tokio::fs::create_dir_all(&work_dir).await.map_err(|e| {
|
||||
super::errors::RecorderError::IoError { err: e }
|
||||
})?;
|
||||
|
||||
// Download header
|
||||
match self
|
||||
.client
|
||||
.read()
|
||||
.await
|
||||
.download_ts(&full_header_url, &format!("{}/{}", work_dir, file_name))
|
||||
.await
|
||||
{
|
||||
Ok(size) => {
|
||||
if size == 0 {
|
||||
log::error!("Download header failed: {}", full_header_url);
|
||||
|
||||
// Create work directory before download
|
||||
tokio::fs::create_dir_all(&work_dir)
|
||||
.await
|
||||
.map_err(|e| super::errors::RecorderError::IoError { err: e })?;
|
||||
|
||||
// Download header
|
||||
match self
|
||||
.client
|
||||
.read()
|
||||
.await
|
||||
.download_ts(&full_header_url, &format!("{}/{}", work_dir, file_name))
|
||||
.await
|
||||
{
|
||||
Ok(size) => {
|
||||
if size == 0 {
|
||||
log::error!("Download header failed: {}", full_header_url);
|
||||
// Clean up empty directory since header download failed
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await {
|
||||
log::warn!(
|
||||
"Failed to cleanup empty work directory {}: {}",
|
||||
work_dir,
|
||||
cleanup_err
|
||||
);
|
||||
}
|
||||
return Err(super::errors::RecorderError::InvalidStream {
|
||||
stream: current_stream,
|
||||
});
|
||||
}
|
||||
header.size = size;
|
||||
|
||||
// Now that download succeeded, create the record and setup stores
|
||||
self.db
|
||||
.add_record(
|
||||
PlatformType::BiliBili,
|
||||
timestamp.to_string().as_str(),
|
||||
self.room_id,
|
||||
&self.room_info.read().await.room_title,
|
||||
self.cover.read().await.clone(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let entry_store = EntryStore::new(&work_dir).await;
|
||||
*self.entry_store.write().await = Some(entry_store);
|
||||
|
||||
// danmu file
|
||||
let danmu_file_path = format!("{}{}", work_dir, "danmu.txt");
|
||||
*self.danmu_storage.write().await =
|
||||
DanmuStorage::new(&danmu_file_path).await;
|
||||
|
||||
self.entry_store
|
||||
.write()
|
||||
.await
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.add_entry(header)
|
||||
.await;
|
||||
|
||||
// Save the new header URL
|
||||
*self.current_header_url.write().await = Some(header_url.clone());
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Download header failed: {}", e);
|
||||
// Clean up empty directory since header download failed
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await {
|
||||
log::warn!("Failed to cleanup empty work directory {}: {}", work_dir, cleanup_err);
|
||||
log::warn!(
|
||||
"Failed to cleanup empty work directory {}: {}",
|
||||
work_dir,
|
||||
cleanup_err
|
||||
);
|
||||
}
|
||||
return Err(super::errors::RecorderError::InvalidStream {
|
||||
stream: current_stream,
|
||||
});
|
||||
return Err(e.into());
|
||||
}
|
||||
header.size = size;
|
||||
|
||||
// Now that download succeeded, create the record and setup stores
|
||||
self.db
|
||||
.add_record(
|
||||
PlatformType::BiliBili,
|
||||
timestamp.to_string().as_str(),
|
||||
self.room_id,
|
||||
&self.room_info.read().await.room_title,
|
||||
self.cover.read().await.clone(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let entry_store = EntryStore::new(&work_dir).await;
|
||||
*self.entry_store.write().await = Some(entry_store);
|
||||
|
||||
// danmu file
|
||||
let danmu_file_path = format!("{}{}", work_dir, "danmu.txt");
|
||||
*self.danmu_storage.write().await = DanmuStorage::new(&danmu_file_path).await;
|
||||
|
||||
self.entry_store
|
||||
.write()
|
||||
.await
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.add_entry(header)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Download header failed: {}", e);
|
||||
// Clean up empty directory since header download failed
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await {
|
||||
log::warn!("Failed to cleanup empty work directory {}: {}", work_dir, cleanup_err);
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Header exists and hasn't changed, use existing work_dir
|
||||
work_dir = self.get_work_dir(self.live_id.read().await.as_str()).await;
|
||||
@@ -700,10 +721,10 @@ impl BiliRecorder {
|
||||
is_first_record = true;
|
||||
}
|
||||
}
|
||||
|
||||
match parsed {
|
||||
Ok(Playlist::MasterPlaylist(pl)) => log::debug!("Master playlist:\n{:?}", pl),
|
||||
Ok(Playlist::MediaPlaylist(pl)) => {
|
||||
|
||||
match playlist {
|
||||
Playlist::MasterPlaylist(pl) => log::debug!("Master playlist:\n{:?}", pl),
|
||||
Playlist::MediaPlaylist(pl) => {
|
||||
let mut new_segment_fetched = false;
|
||||
let last_sequence = self
|
||||
.entry_store
|
||||
@@ -733,12 +754,22 @@ impl BiliRecorder {
|
||||
}
|
||||
segment_offsets.push(seg_offset);
|
||||
}
|
||||
|
||||
|
||||
// Extract stream start timestamp from header if available for FMP4
|
||||
let stream_start_timestamp = if current_stream.format == StreamType::FMP4 {
|
||||
if let Some(header_entry) = self.entry_store.read().await.as_ref().and_then(|store| store.get_header()) {
|
||||
if let Some(header_entry) = self
|
||||
.entry_store
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.and_then(|store| store.get_header())
|
||||
{
|
||||
// Parse timestamp from header filename like "h1753276580.m4s"
|
||||
if let Some(timestamp_str) = header_entry.url.strip_prefix("h").and_then(|s| s.strip_suffix(".m4s")) {
|
||||
if let Some(timestamp_str) = header_entry
|
||||
.url
|
||||
.strip_prefix("h")
|
||||
.and_then(|s| s.strip_suffix(".m4s"))
|
||||
{
|
||||
timestamp_str.parse::<i64>().unwrap_or(0)
|
||||
} else {
|
||||
0
|
||||
@@ -766,70 +797,82 @@ impl BiliRecorder {
|
||||
}
|
||||
|
||||
// Calculate precise timestamp from stream start + BILI-AUX offset for FMP4
|
||||
let ts_mili = if current_stream.format == StreamType::FMP4 && stream_start_timestamp > 0 && i < segment_offsets.len() {
|
||||
let ts_mili = if current_stream.format == StreamType::FMP4
|
||||
&& stream_start_timestamp > 0
|
||||
&& i < segment_offsets.len()
|
||||
{
|
||||
let seg_offset = segment_offsets[i];
|
||||
|
||||
|
||||
stream_start_timestamp * 1000 + seg_offset
|
||||
} else {
|
||||
// Fallback to current time if parsing fails or not FMP4
|
||||
Utc::now().timestamp_millis()
|
||||
};
|
||||
|
||||
|
||||
// encode segment offset into filename
|
||||
let file_name = ts.uri.split('/').next_back().unwrap_or(&ts.uri);
|
||||
let ts_length = pl.target_duration as f64;
|
||||
|
||||
// Calculate precise duration from BILI-AUX offsets for FMP4
|
||||
let precise_length_from_aux = if current_stream.format == StreamType::FMP4 && i < segment_offsets.len() {
|
||||
let current_offset = segment_offsets[i];
|
||||
|
||||
// Get the previous offset for duration calculation
|
||||
let prev_offset = if i > 0 {
|
||||
// Use previous segment in current M3U8
|
||||
Some(segment_offsets[i - 1])
|
||||
} else {
|
||||
// Use saved last offset from previous M3U8 processing
|
||||
last_offset
|
||||
};
|
||||
|
||||
if let Some(prev) = prev_offset {
|
||||
let duration_ms = current_offset - prev;
|
||||
if duration_ms > 0 {
|
||||
Some(duration_ms as f64 / 1000.0) // Convert ms to seconds
|
||||
let precise_length_from_aux =
|
||||
if current_stream.format == StreamType::FMP4 && i < segment_offsets.len() {
|
||||
let current_offset = segment_offsets[i];
|
||||
|
||||
// Get the previous offset for duration calculation
|
||||
let prev_offset = if i > 0 {
|
||||
// Use previous segment in current M3U8
|
||||
Some(segment_offsets[i - 1])
|
||||
} else {
|
||||
// Use saved last offset from previous M3U8 processing
|
||||
last_offset
|
||||
};
|
||||
|
||||
if let Some(prev) = prev_offset {
|
||||
let duration_ms = current_offset - prev;
|
||||
if duration_ms > 0 {
|
||||
Some(duration_ms as f64 / 1000.0) // Convert ms to seconds
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
// No previous offset available, use target duration
|
||||
None
|
||||
}
|
||||
} else {
|
||||
// No previous offset available, use target duration
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
};
|
||||
let client = self.client.clone();
|
||||
let mut retry = 0;
|
||||
let mut work_dir_created_for_non_fmp4 = false;
|
||||
|
||||
|
||||
// For non-FMP4 streams, create record on first successful ts download
|
||||
if is_first_record && current_stream.format != StreamType::FMP4 {
|
||||
// Create work directory before first ts download
|
||||
tokio::fs::create_dir_all(&work_dir).await.map_err(|e| {
|
||||
super::errors::RecorderError::IoError { err: e }
|
||||
})?;
|
||||
tokio::fs::create_dir_all(&work_dir)
|
||||
.await
|
||||
.map_err(|e| super::errors::RecorderError::IoError { err: e })?;
|
||||
work_dir_created_for_non_fmp4 = true;
|
||||
}
|
||||
|
||||
|
||||
loop {
|
||||
if retry > 3 {
|
||||
log::error!("Download ts failed after retry");
|
||||
|
||||
|
||||
// Clean up empty directory if first ts download failed for non-FMP4
|
||||
if is_first_record && current_stream.format != StreamType::FMP4 && work_dir_created_for_non_fmp4 {
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await {
|
||||
log::warn!("Failed to cleanup empty work directory {}: {}", work_dir, cleanup_err);
|
||||
if is_first_record
|
||||
&& current_stream.format != StreamType::FMP4
|
||||
&& work_dir_created_for_non_fmp4
|
||||
{
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to cleanup empty work directory {}: {}",
|
||||
work_dir,
|
||||
cleanup_err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
break;
|
||||
}
|
||||
match client
|
||||
@@ -841,19 +884,28 @@ impl BiliRecorder {
|
||||
Ok(size) => {
|
||||
if size == 0 {
|
||||
log::error!("Segment with size 0, stream might be corrupted");
|
||||
|
||||
|
||||
// Clean up empty directory if first ts download failed for non-FMP4
|
||||
if is_first_record && current_stream.format != StreamType::FMP4 && work_dir_created_for_non_fmp4 {
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await {
|
||||
log::warn!("Failed to cleanup empty work directory {}: {}", work_dir, cleanup_err);
|
||||
if is_first_record
|
||||
&& current_stream.format != StreamType::FMP4
|
||||
&& work_dir_created_for_non_fmp4
|
||||
{
|
||||
if let Err(cleanup_err) =
|
||||
tokio::fs::remove_dir_all(&work_dir).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to cleanup empty work directory {}: {}",
|
||||
work_dir,
|
||||
cleanup_err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return Err(super::errors::RecorderError::InvalidStream {
|
||||
stream: current_stream,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Create record and setup stores on first successful download for non-FMP4
|
||||
if is_first_record && current_stream.format != StreamType::FMP4 {
|
||||
self.db
|
||||
@@ -872,20 +924,31 @@ impl BiliRecorder {
|
||||
|
||||
// danmu file
|
||||
let danmu_file_path = format!("{}{}", work_dir, "danmu.txt");
|
||||
*self.danmu_storage.write().await = DanmuStorage::new(&danmu_file_path).await;
|
||||
|
||||
*self.danmu_storage.write().await =
|
||||
DanmuStorage::new(&danmu_file_path).await;
|
||||
|
||||
is_first_record = false;
|
||||
}
|
||||
|
||||
// Get precise duration - prioritize BILI-AUX for FMP4, fallback to ffprobe if needed
|
||||
let precise_length = if let Some(aux_duration) = precise_length_from_aux {
|
||||
let precise_length = if let Some(aux_duration) =
|
||||
precise_length_from_aux
|
||||
{
|
||||
aux_duration
|
||||
} else if current_stream.format != StreamType::FMP4 {
|
||||
// For regular TS segments, use direct ffprobe
|
||||
let file_path = format!("{}/{}", work_dir, file_name);
|
||||
match crate::ffmpeg::get_segment_duration(std::path::Path::new(&file_path)).await {
|
||||
match crate::ffmpeg::get_segment_duration(std::path::Path::new(
|
||||
&file_path,
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(duration) => {
|
||||
log::debug!("Precise TS segment duration: {}s (original: {}s)", duration, ts_length);
|
||||
log::debug!(
|
||||
"Precise TS segment duration: {}s (original: {}s)",
|
||||
duration,
|
||||
ts_length
|
||||
);
|
||||
duration
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -913,23 +976,35 @@ impl BiliRecorder {
|
||||
is_header: false,
|
||||
})
|
||||
.await;
|
||||
|
||||
|
||||
// Update last offset for next segment calculation
|
||||
if current_stream.format == StreamType::FMP4 && i < segment_offsets.len() {
|
||||
if current_stream.format == StreamType::FMP4
|
||||
&& i < segment_offsets.len()
|
||||
{
|
||||
last_offset = Some(segment_offsets[i]);
|
||||
}
|
||||
|
||||
|
||||
new_segment_fetched = true;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
retry += 1;
|
||||
log::warn!("Download ts failed, retry {}: {}", retry, e);
|
||||
|
||||
|
||||
// If this is the last retry and it's the first record for non-FMP4, clean up
|
||||
if retry > 3 && is_first_record && current_stream.format != StreamType::FMP4 && work_dir_created_for_non_fmp4 {
|
||||
if let Err(cleanup_err) = tokio::fs::remove_dir_all(&work_dir).await {
|
||||
log::warn!("Failed to cleanup empty work directory {}: {}", work_dir, cleanup_err);
|
||||
if retry > 3
|
||||
&& is_first_record
|
||||
&& current_stream.format != StreamType::FMP4
|
||||
&& work_dir_created_for_non_fmp4
|
||||
{
|
||||
if let Err(cleanup_err) =
|
||||
tokio::fs::remove_dir_all(&work_dir).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to cleanup empty work directory {}: {}",
|
||||
work_dir,
|
||||
cleanup_err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -939,12 +1014,12 @@ impl BiliRecorder {
|
||||
|
||||
if new_segment_fetched {
|
||||
*self.last_update.write().await = Utc::now().timestamp();
|
||||
|
||||
|
||||
// Save the last offset for next M3U8 processing
|
||||
if current_stream.format == StreamType::FMP4 {
|
||||
*self.last_segment_offset.write().await = last_offset;
|
||||
}
|
||||
|
||||
|
||||
self.db
|
||||
.update_record(
|
||||
timestamp.to_string().as_str(),
|
||||
@@ -978,10 +1053,6 @@ impl BiliRecorder {
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
self.force_update.store(true, Ordering::Relaxed);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
// check stream is nearly expired
|
||||
@@ -1031,11 +1102,7 @@ impl BiliRecorder {
|
||||
};
|
||||
|
||||
if let Some(entry_store) = self.entry_store.read().await.as_ref() {
|
||||
entry_store.manifest(
|
||||
!live_status || range.is_some(),
|
||||
true,
|
||||
range,
|
||||
)
|
||||
entry_store.manifest(!live_status || range.is_some(), true, range)
|
||||
} else {
|
||||
// Return empty manifest if entry_store is not initialized yet
|
||||
"#EXTM3U\n#EXT-X-VERSION:3\n".to_string()
|
||||
@@ -1083,26 +1150,31 @@ impl super::Recorder for BiliRecorder {
|
||||
std::cmp::min(5, connection_fail_count + 1);
|
||||
} else if let RecorderError::HeaderChanged = e {
|
||||
// Mark that exit was triggered by header change
|
||||
self_clone.header_changed_recently.store(true, Ordering::Relaxed);
|
||||
self_clone
|
||||
.header_changed_recently
|
||||
.store(true, Ordering::Relaxed);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
*self_clone.is_recording.write().await = false;
|
||||
|
||||
|
||||
// Check if exit was triggered by header change for faster recovery
|
||||
let sleep_duration = if self_clone.header_changed_recently.load(Ordering::Relaxed) {
|
||||
// Clear the flag after checking
|
||||
self_clone.header_changed_recently.store(false, Ordering::Relaxed);
|
||||
// Quick recovery for header change - only 500ms
|
||||
Duration::from_millis(500)
|
||||
} else {
|
||||
// Normal random delay for other errors
|
||||
let secs = rand::random::<u64>() % 4 + 2;
|
||||
Duration::from_secs(secs + 2_u64.pow(connection_fail_count))
|
||||
};
|
||||
|
||||
let sleep_duration =
|
||||
if self_clone.header_changed_recently.load(Ordering::Relaxed) {
|
||||
// Clear the flag after checking
|
||||
self_clone
|
||||
.header_changed_recently
|
||||
.store(false, Ordering::Relaxed);
|
||||
// Quick recovery for header change - only 500ms
|
||||
Duration::from_millis(500)
|
||||
} else {
|
||||
// Normal random delay for other errors
|
||||
let secs = rand::random::<u64>() % 4 + 2;
|
||||
Duration::from_secs(secs + 2_u64.pow(connection_fail_count))
|
||||
};
|
||||
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
continue;
|
||||
}
|
||||
@@ -1196,7 +1268,11 @@ impl super::Recorder for BiliRecorder {
|
||||
Ok(if live_id == *self.live_id.read().await {
|
||||
// just return current cache content
|
||||
match self.danmu_storage.read().await.as_ref() {
|
||||
Some(storage) => storage.get_entries(self.first_segment_ts(live_id).await).await,
|
||||
Some(storage) => {
|
||||
storage
|
||||
.get_entries(self.first_segment_ts(live_id).await)
|
||||
.await
|
||||
}
|
||||
None => Vec::new(),
|
||||
}
|
||||
} else {
|
||||
@@ -1214,7 +1290,9 @@ impl super::Recorder for BiliRecorder {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let storage = storage.unwrap();
|
||||
storage.get_entries(self.first_segment_ts(live_id).await).await
|
||||
storage
|
||||
.get_entries(self.first_segment_ts(live_id).await)
|
||||
.await
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1222,7 +1300,10 @@ impl super::Recorder for BiliRecorder {
|
||||
*self.live_id.read().await == live_id && *self.live_status.read().await
|
||||
}
|
||||
|
||||
async fn get_archive_subtitle(&self, live_id: &str) -> Result<String, super::errors::RecorderError> {
|
||||
async fn get_archive_subtitle(
|
||||
&self,
|
||||
live_id: &str,
|
||||
) -> Result<String, super::errors::RecorderError> {
|
||||
// read subtitle file under work_dir
|
||||
let work_dir = self.get_work_dir(live_id).await;
|
||||
let subtitle_file_path = format!("{}/{}", work_dir, "subtitle.srt");
|
||||
@@ -1239,7 +1320,10 @@ impl super::Recorder for BiliRecorder {
|
||||
Ok(subtitle_content)
|
||||
}
|
||||
|
||||
async fn generate_archive_subtitle(&self, live_id: &str) -> Result<String, super::errors::RecorderError> {
|
||||
async fn generate_archive_subtitle(
|
||||
&self,
|
||||
live_id: &str,
|
||||
) -> Result<String, super::errors::RecorderError> {
|
||||
// generate subtitle file under work_dir
|
||||
let work_dir = self.get_work_dir(live_id).await;
|
||||
let subtitle_file_path = format!("{}/{}", work_dir, "subtitle.srt");
|
||||
@@ -1252,7 +1336,13 @@ impl super::Recorder for BiliRecorder {
|
||||
log::info!("M3U8 index file generated: {}", m3u8_index_file_path);
|
||||
// generate a tmp clip file
|
||||
let clip_file_path = format!("{}/{}", work_dir, "tmp.mp4");
|
||||
if let Err(e) = crate::ffmpeg::clip_from_m3u8(None::<&crate::progress_reporter::ProgressReporter>, Path::new(&m3u8_index_file_path), Path::new(&clip_file_path)).await {
|
||||
if let Err(e) = crate::ffmpeg::clip_from_m3u8(
|
||||
None::<&crate::progress_reporter::ProgressReporter>,
|
||||
Path::new(&m3u8_index_file_path),
|
||||
Path::new(&clip_file_path),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Err(super::errors::RecorderError::SubtitleGenerationFailed {
|
||||
error: e.to_string(),
|
||||
});
|
||||
@@ -1260,7 +1350,17 @@ impl super::Recorder for BiliRecorder {
|
||||
log::info!("Temp clip file generated: {}", clip_file_path);
|
||||
// generate subtitle file
|
||||
let config = self.config.read().await;
|
||||
let result = crate::ffmpeg::generate_video_subtitle(None, Path::new(&clip_file_path), "whisper", &config.whisper_model, &config.whisper_prompt, &config.openai_api_key, &config.openai_api_endpoint, &config.whisper_language).await;
|
||||
let result = crate::ffmpeg::generate_video_subtitle(
|
||||
None,
|
||||
Path::new(&clip_file_path),
|
||||
"whisper",
|
||||
&config.whisper_model,
|
||||
&config.whisper_prompt,
|
||||
&config.openai_api_key,
|
||||
&config.openai_api_endpoint,
|
||||
&config.whisper_language,
|
||||
)
|
||||
.await;
|
||||
// write subtitle file
|
||||
if let Err(e) = result {
|
||||
return Err(super::errors::RecorderError::SubtitleGenerationFailed {
|
||||
@@ -1269,7 +1369,12 @@ impl super::Recorder for BiliRecorder {
|
||||
}
|
||||
log::info!("Subtitle generated");
|
||||
let result = result.unwrap();
|
||||
let subtitle_content = result.subtitle_content.iter().map(item_to_srt).collect::<Vec<String>>().join("");
|
||||
let subtitle_content = result
|
||||
.subtitle_content
|
||||
.iter()
|
||||
.map(item_to_srt)
|
||||
.collect::<Vec<String>>()
|
||||
.join("");
|
||||
subtitle_file.write_all(subtitle_content.as_bytes()).await?;
|
||||
log::info!("Subtitle file written");
|
||||
// remove tmp file
|
||||
|
||||
@@ -138,19 +138,6 @@ impl BiliStream {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_same(&self, other: &BiliStream) -> bool {
|
||||
// Extract live_id part from path (e.g., live_1848752274_71463808)
|
||||
let get_live_id = |path: &str| {
|
||||
path.split('/')
|
||||
.find(|part| part.starts_with("live_"))
|
||||
.unwrap_or("")
|
||||
.to_string()
|
||||
};
|
||||
let self_live_id = get_live_id(&self.path);
|
||||
let other_live_id = get_live_id(&other.path);
|
||||
self_live_id == other_live_id
|
||||
}
|
||||
}
|
||||
|
||||
impl BiliClient {
|
||||
|
||||
@@ -212,7 +212,6 @@ impl EntryStore {
|
||||
/// `vod` indicates the manifest is for stream or video.
|
||||
/// `force_time` adds DATE-TIME tag for each entry.
|
||||
pub fn manifest(&self, vod: bool, force_time: bool, range: Option<Range>) -> String {
|
||||
log::debug!("Generate manifest for range: {:?} with vod: {} and force_time: {}", range, vod, force_time);
|
||||
let mut m3u8_content = "#EXTM3U\n".to_string();
|
||||
m3u8_content += "#EXT-X-VERSION:6\n";
|
||||
m3u8_content += if vod {
|
||||
@@ -240,12 +239,6 @@ impl EntryStore {
|
||||
// Collect entries in range
|
||||
let first_entry = self.entries.first().unwrap();
|
||||
let first_entry_ts = first_entry.ts_seconds();
|
||||
log::debug!("First entry ts: {}", first_entry_ts);
|
||||
let last_entry = self.entries.last().unwrap();
|
||||
let last_entry_ts = last_entry.ts_seconds();
|
||||
log::debug!("Last entry ts: {}", last_entry_ts);
|
||||
log::debug!("Full length: {}", last_entry_ts - first_entry_ts);
|
||||
log::debug!("Range: {:?}", range);
|
||||
let mut entries_in_range = vec![];
|
||||
for e in &self.entries {
|
||||
// ignore header, cause it's already in EXT-X-MAP
|
||||
|
||||
Reference in New Issue
Block a user