fix: recorder adding back when removing (close #114)

Add a to_remove-set to filter recorders that are still in removing-stage,
so that monitor-thread wouldn't add them back.
This commit is contained in:
Xinrea
2025-06-08 10:37:04 +08:00
parent e462bd0b4c
commit 69a35af456

View File

@@ -15,7 +15,7 @@ use crate::recorder::RecorderInfo;
use chrono::Utc; use chrono::Utc;
use custom_error::custom_error; use custom_error::custom_error;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
@@ -65,6 +65,7 @@ pub struct RecorderManager {
db: Arc<Database>, db: Arc<Database>,
config: Arc<RwLock<Config>>, config: Arc<RwLock<Config>>,
recorders: Arc<RwLock<HashMap<String, Box<dyn Recorder>>>>, recorders: Arc<RwLock<HashMap<String, Box<dyn Recorder>>>>,
to_remove: Arc<RwLock<HashSet<String>>>,
event_tx: broadcast::Sender<RecorderEvent>, event_tx: broadcast::Sender<RecorderEvent>,
is_migrating: Arc<AtomicBool>, is_migrating: Arc<AtomicBool>,
} }
@@ -120,6 +121,7 @@ impl RecorderManager {
db, db,
config, config,
recorders: Arc::new(RwLock::new(HashMap::new())), recorders: Arc::new(RwLock::new(HashMap::new())),
to_remove: Arc::new(RwLock::new(HashSet::new())),
event_tx, event_tx,
is_migrating: Arc::new(AtomicBool::new(false)), is_migrating: Arc::new(AtomicBool::new(false)),
}; };
@@ -146,6 +148,7 @@ impl RecorderManager {
db: self.db.clone(), db: self.db.clone(),
config: self.config.clone(), config: self.config.clone(),
recorders: self.recorders.clone(), recorders: self.recorders.clone(),
to_remove: self.to_remove.clone(),
event_tx: self.event_tx.clone(), event_tx: self.event_tx.clone(),
is_migrating: self.is_migrating.clone(), is_migrating: self.is_migrating.clone(),
} }
@@ -291,7 +294,9 @@ impl RecorderManager {
let mut recorders_to_add = Vec::new(); let mut recorders_to_add = Vec::new();
for (platform, room_id) in recorder_map.keys() { for (platform, room_id) in recorder_map.keys() {
let recorder_id = format!("{}:{}", platform.as_str(), room_id); let recorder_id = format!("{}:{}", platform.as_str(), room_id);
if !self.recorders.read().await.contains_key(&recorder_id) { if !self.recorders.read().await.contains_key(&recorder_id)
&& !self.to_remove.read().await.contains(&recorder_id)
{
recorders_to_add.push((*platform, *room_id)); recorders_to_add.push((*platform, *room_id));
} }
} }
@@ -387,6 +392,10 @@ impl RecorderManager {
self.recorders.write().await.clear(); self.recorders.write().await.clear();
} }
/// Remove a recorder from the manager
///
/// This will stop the recorder and remove it from the manager
/// and remove the related cache folder
pub async fn remove_recorder( pub async fn remove_recorder(
&self, &self,
platform: PlatformType, platform: PlatformType,
@@ -398,6 +407,9 @@ impl RecorderManager {
return Err(RecorderManagerError::NotFound { room_id }); return Err(RecorderManagerError::NotFound { room_id });
} }
// add to to_remove
self.to_remove.write().await.insert(recorder_id.clone());
// stop recorder // stop recorder
if let Some(recorder_ref) = self.recorders.read().await.get(&recorder_id) { if let Some(recorder_ref) = self.recorders.read().await.get(&recorder_id) {
recorder_ref.stop().await; recorder_ref.stop().await;
@@ -406,6 +418,9 @@ impl RecorderManager {
// remove recorder // remove recorder
self.recorders.write().await.remove(&recorder_id); self.recorders.write().await.remove(&recorder_id);
// remove from to_remove
self.to_remove.write().await.remove(&recorder_id);
// remove related cache folder // remove related cache folder
let cache_folder = format!( let cache_folder = format!(
"{}/{}/{}", "{}/{}/{}",