fix: multiple danmu task running (#215)

This commit is contained in:
Xinrea
2025-11-01 23:43:40 +08:00
committed by GitHub
parent 617a6a0b8e
commit 8bea9336ae
3 changed files with 92 additions and 57 deletions

View File

@@ -15,7 +15,7 @@ use crate::errors::RecorderError;
use crate::ffmpeg::VideoMetadata;
use crate::{core::HlsStream, events::RecorderEvent};
const UPDATE_TIMEOUT: Duration = Duration::from_secs(10);
const UPDATE_TIMEOUT: Duration = Duration::from_secs(20);
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
const PLAYLIST_FILE_NAME: &str = "playlist.m3u8";
const DOWNLOAD_RETRY: u32 = 3;
@@ -193,13 +193,19 @@ impl HlsRecorder {
// we need to remove the query parameters: 1.ts
let filename = segment.uri.split('?').next().unwrap_or(&segment.uri);
let segment_path = self.work_dir.join(filename);
let size = download(
let Ok(size) = download(
&self.client,
&segment_full_url,
&segment_path,
DOWNLOAD_RETRY,
)
.await?;
.await
else {
log::error!("Download failed: {:#?}", segment);
return Err(RecorderError::IoError(std::io::Error::other(
"Download failed",
)));
};
// check if the stream is changed
let segment_metadata = crate::ffmpeg::extract_video_metadata(&segment_path)

View File

@@ -243,34 +243,49 @@ impl BiliRecorder {
}
let danmu_stream = danmu_stream.unwrap();
// create a task to receive danmu message
let danmu_stream_clone = danmu_stream.clone();
tokio::spawn(async move {
let _ = danmu_stream_clone.start().await;
});
let mut start_fut = Box::pin(danmu_stream.start());
loop {
if let Ok(Some(msg)) = danmu_stream.recv().await {
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
let ts = Utc::now().timestamp_millis();
let _ = self.event_channel.send(RecorderEvent::DanmuReceived {
room: self.room_id.clone(),
ts,
content: danmu.message.clone(),
});
if let Some(storage) = self.danmu_storage.write().await.as_ref() {
storage.add_line(ts, &danmu.message).await;
tokio::select! {
start_res = &mut start_fut => {
match start_res {
Ok(_) => {
log::info!("[{}]Danmu stream finished", &self.room_id);
return Ok(());
}
Err(err) => {
log::error!("[{}]Danmu stream start error: {}", &self.room_id, err);
return Err(crate::errors::RecorderError::DanmuStreamError(err));
}
}
}
recv_res = danmu_stream.recv() => {
match recv_res {
Ok(Some(msg)) => {
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
let ts = Utc::now().timestamp_millis();
let _ = self.event_channel.send(RecorderEvent::DanmuReceived {
room: self.room_id.clone(),
ts,
content: danmu.message.clone(),
});
if let Some(storage) = self.danmu_storage.write().await.as_ref() {
storage.add_line(ts, &danmu.message).await;
}
}
}
}
Ok(None) => {
log::info!("[{}]Danmu stream closed", &self.room_id);
return Ok(());
}
Err(err) => {
log::error!("[{}]Failed to receive danmu message: {}", &self.room_id, err);
return Err(crate::errors::RecorderError::DanmuStreamError(err));
}
}
}
} else {
log::error!("[{}]Failed to receive danmu message", &self.room_id);
return Err(crate::errors::RecorderError::DanmuStreamError(
danmu_stream::DanmuStreamError::WebsocketError {
err: "Failed to receive danmu message".to_string(),
},
));
}
}
}

View File

@@ -190,34 +190,50 @@ impl DouyinRecorder {
}
let danmu_stream = danmu_stream.unwrap();
let danmu_stream_clone = danmu_stream.clone();
*self.danmu_task.lock().await = Some(tokio::spawn(async move {
let _ = danmu_stream_clone.start().await;
}));
let mut start_fut = Box::pin(danmu_stream.start());
loop {
if let Ok(Some(msg)) = danmu_stream.recv().await {
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
let ts = Utc::now().timestamp_millis();
let _ = self.event_channel.send(RecorderEvent::DanmuReceived {
room: self.room_id.clone(),
ts,
content: danmu.message.clone(),
});
if let Some(danmu_storage) = self.danmu_storage.read().await.as_ref() {
danmu_storage.add_line(ts, &danmu.message).await;
tokio::select! {
start_res = &mut start_fut => {
match start_res {
Ok(_) => {
log::info!("Danmu stream finished");
return Ok(());
}
Err(err) => {
log::error!("Danmu stream start error: {err}");
return Err(crate::errors::RecorderError::DanmuStreamError(err));
}
}
}
recv_res = danmu_stream.recv() => {
match recv_res {
Ok(Some(msg)) => {
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
let ts = Utc::now().timestamp_millis();
let _ = self.event_channel.send(RecorderEvent::DanmuReceived {
room: self.room_id.clone(),
ts,
content: danmu.message.clone(),
});
if let Some(danmu_storage) = self.danmu_storage.read().await.as_ref() {
danmu_storage.add_line(ts, &danmu.message).await;
}
}
}
}
Ok(None) => {
log::info!("Danmu stream closed");
return Ok(());
}
Err(err) => {
log::error!("Failed to receive danmu message: {err}");
return Err(crate::errors::RecorderError::DanmuStreamError(err));
}
}
}
} else {
log::error!("Failed to receive danmu message");
return Err(crate::errors::RecorderError::DanmuStreamError(
danmu_stream::DanmuStreamError::WebsocketError {
err: "Failed to receive danmu message".to_string(),
},
));
}
}
}
@@ -230,6 +246,11 @@ impl DouyinRecorder {
self.total_duration.store(0, atomic::Ordering::Relaxed);
self.total_size.store(0, atomic::Ordering::Relaxed);
*self.extra.live_stream.write().await = None;
if let Some(danmu_task) = self.danmu_task.lock().await.as_mut() {
danmu_task.abort();
let _ = danmu_task.await;
log::info!("Danmu task aborted");
}
}
async fn update_entries(&self, live_id: &str) -> Result<(), RecorderError> {
@@ -256,13 +277,6 @@ impl DouyinRecorder {
*self.danmu_storage.write().await = danmu_storage;
// Start danmu task
if let Some(danmu_task) = self.danmu_task.lock().await.as_mut() {
danmu_task.abort();
}
if let Some(danmu_stream_task) = self.danmu_task.lock().await.as_mut() {
danmu_stream_task.abort();
}
*self.live_id.write().await = live_id.to_string();
let self_clone = self.clone();
@@ -290,7 +304,7 @@ impl DouyinRecorder {
)
.await;
if let Err(e) = hls_recorder.start().await {
log::error!("[{}]Failed to start hls recorder: {}", self.room_id, e);
log::error!("[{}]Error from hls recorder: {}", self.room_id, e);
return Err(e);
}