feat: douyin danmu (#119)

* feat: migrate to uniformed interface for danmu stream

* feat: douyin danmu support (close #113)

* chore: fix typo

* fix: loop-decompress body
This commit is contained in:
Xinrea
2025-06-12 01:00:33 +08:00
committed by GitHub
parent 69a35af456
commit 66f671ffa0
28 changed files with 16769 additions and 361 deletions

View File

@@ -1,6 +1,7 @@
[[language]] [[language]]
name = "rust" name = "rust"
auto-format = true auto-format = true
rulers = []
[[language]] [[language]]
name = "svelte" name = "svelte"

625
src-tauri/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,7 @@
[workspace]
members = ["crates/danmu_stream"]
resolver = "2"
[package] [package]
name = "bili-shadowreplay" name = "bili-shadowreplay"
version = "1.0.0" version = "1.0.0"
@@ -10,6 +14,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
danmu_stream = { path = "crates/danmu_stream" }
serde_json = "1.0" serde_json = "1.0"
reqwest = { version = "0.11", features = ["blocking", "json"] } reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_derive = "1.0.158" serde_derive = "1.0.158"
@@ -21,7 +26,6 @@ async-ffmpeg-sidecar = "0.0.1"
chrono = { version = "0.4.24", features = ["serde"] } chrono = { version = "0.4.24", features = ["serde"] }
toml = "0.7.3" toml = "0.7.3"
custom_error = "1.9.2" custom_error = "1.9.2"
felgens = { git = "https://github.com/Xinrea/felgens.git", tag = "v0.4.6" }
regex = "1.7.3" regex = "1.7.3"
tokio = { version = "1.27.0", features = ["process"] } tokio = { version = "1.27.0", features = ["process"] }
platform-dirs = "0.3.0" platform-dirs = "0.3.0"

View File

@@ -0,0 +1,43 @@
[package]
name = "danmu_stream"
version = "0.1.0"
edition = "2021"
[lib]
name = "danmu_stream"
path = "src/lib.rs"
[[example]]
name = "douyin"
path = "examples/douyin.rs"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3"
prost = "0.12"
chrono = "0.4"
log = "0.4"
env_logger = "0.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
url = "2.4"
md5 = "0.7"
regex = "1.9"
deno_core = "0.242.0"
pct-str = "2.0.0"
custom_error = "1.9.2"
flate2 = "1.0"
scroll = "0.13.0"
scroll_derive = "0.13.0"
brotli = "8.0.1"
http = "1.0"
rand = "0.9.1"
urlencoding = "2.1.3"
gzip = "0.1.2"
hex = "0.4.3"
async-trait = "0.1.88"
[build-dependencies]
tonic-build = "0.10"

View File

View File

@@ -0,0 +1,40 @@
use std::{sync::Arc, time::Duration};
use danmu_stream::{danmu_stream::DanmuStream, provider::ProviderType, DanmuMessageType};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::init();
// Replace these with actual values
let room_id = 7514298567821937427; // Replace with actual Douyin room_id. When live starts, the room_id will be generated, so it's more like a live_id.
let cookie = "your_cookie";
let stream = Arc::new(DanmuStream::new(ProviderType::Douyin, cookie, room_id).await?);
log::info!("Start to receive danmu messages");
let _ = stream.start().await;
let stream_clone = stream.clone();
tokio::spawn(async move {
loop {
if let Ok(Some(msg)) = stream_clone.recv().await {
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
log::info!("Received danmu message: {:?}", danmu.message);
}
}
} else {
log::info!("Channel closed");
break;
}
}
});
sleep(Duration::from_secs(10)).await;
stream.stop().await?;
Ok(())
}

View File

@@ -0,0 +1,50 @@
use std::sync::Arc;
use crate::{
provider::{new, DanmuProvider, ProviderType},
DanmuMessageType, DanmuStreamError,
};
use tokio::sync::{mpsc, RwLock};
pub struct DanmuStream {
pub provider_type: ProviderType,
pub identifier: String,
pub room_id: u64,
pub provider: Arc<RwLock<Box<dyn DanmuProvider>>>,
tx: mpsc::UnboundedSender<DanmuMessageType>,
rx: Arc<RwLock<mpsc::UnboundedReceiver<DanmuMessageType>>>,
}
impl DanmuStream {
pub async fn new(
provider_type: ProviderType,
identifier: &str,
room_id: u64,
) -> Result<Self, DanmuStreamError> {
let (tx, rx) = mpsc::unbounded_channel();
let provider = new(provider_type, identifier, room_id).await?;
Ok(Self {
provider_type,
identifier: identifier.to_string(),
room_id,
provider: Arc::new(RwLock::new(provider)),
tx,
rx: Arc::new(RwLock::new(rx)),
})
}
pub async fn start(&self) -> Result<(), DanmuStreamError> {
self.provider.write().await.start(self.tx.clone()).await
}
pub async fn stop(&self) -> Result<(), DanmuStreamError> {
self.provider.write().await.stop().await?;
// close channel
self.rx.write().await.close();
Ok(())
}
pub async fn recv(&self) -> Result<Option<DanmuMessageType>, DanmuStreamError> {
Ok(self.rx.write().await.recv().await)
}
}

View File

@@ -0,0 +1,51 @@
use std::time::Duration;
use crate::DanmuStreamError;
use reqwest::header::HeaderMap;
impl From<reqwest::Error> for DanmuStreamError {
fn from(value: reqwest::Error) -> Self {
Self::HttpError { err: value }
}
}
impl From<url::ParseError> for DanmuStreamError {
fn from(value: url::ParseError) -> Self {
Self::ParseError { err: value }
}
}
pub struct ApiClient {
client: reqwest::Client,
header: HeaderMap,
}
impl ApiClient {
pub fn new(cookies: &str) -> Self {
let mut header = HeaderMap::new();
header.insert("cookie", cookies.parse().unwrap());
Self {
client: reqwest::Client::new(),
header,
}
}
pub async fn get(
&self,
url: &str,
query: Option<&[(&str, &str)]>,
) -> Result<reqwest::Response, DanmuStreamError> {
let resp = self
.client
.get(url)
.query(query.unwrap_or_default())
.headers(self.header.clone())
.timeout(Duration::from_secs(10))
.send()
.await?
.error_for_status()?;
Ok(resp)
}
}

View File

@@ -0,0 +1,29 @@
pub mod danmu_stream;
mod http_client;
pub mod provider;
use custom_error::custom_error;
custom_error! {pub DanmuStreamError
HttpError {err: reqwest::Error} = "HttpError {err}",
ParseError {err: url::ParseError} = "ParseError {err}",
WebsocketError {err: String } = "WebsocketError {err}",
PackError {err: String} = "PackError {err}",
UnsupportProto {proto: u16} = "UnsupportProto {proto}",
MessageParseError {err: String} = "MessageParseError {err}",
InvalidIdentifier {err: String} = "InvalidIdentifier {err}"
}
pub enum DanmuMessageType {
DanmuMessage(DanmuMessage),
}
#[derive(Debug, Clone)]
pub struct DanmuMessage {
pub room_id: u64,
pub user_id: u64,
pub user_name: String,
pub message: String,
pub color: u32,
pub timestamp: i64,
}

View File

@@ -0,0 +1,72 @@
mod bilibili;
mod douyin;
use async_trait::async_trait;
use tokio::sync::mpsc;
use crate::{
provider::bilibili::BiliDanmu, provider::douyin::DouyinDanmu, DanmuMessageType,
DanmuStreamError,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProviderType {
BiliBili,
Douyin,
}
#[async_trait]
pub trait DanmuProvider: Send + Sync {
async fn new(identifier: &str, room_id: u64) -> Result<Self, DanmuStreamError>
where
Self: Sized;
async fn start(
&self,
tx: mpsc::UnboundedSender<DanmuMessageType>,
) -> Result<(), DanmuStreamError>;
async fn stop(&self) -> Result<(), DanmuStreamError>;
}
/// Creates a new danmu stream provider for the specified platform.
///
/// This function initializes and starts a danmu stream provider based on the specified platform type.
/// The provider will fetch danmu messages and send them through the provided channel.
///
/// # Arguments
///
/// * `tx` - An unbounded sender channel that will receive danmu messages
/// * `provider_type` - The type of platform to fetch danmu from (BiliBili or Douyin)
/// * `identifier` - User validation information (e.g., cookies) required by the platform
/// * `room_id` - The unique identifier of the room/channel to fetch danmu from. Notice that douyin room_id is more like a live_id, it changes every time the live starts.
///
/// # Returns
///
/// Returns `Result<(), DanmmuStreamError>` where:
/// * `Ok(())` indicates successful initialization and start of the provider, only return after disconnect
/// * `Err(DanmmuStreamError)` indicates an error occurred during initialization or startup
///
/// # Examples
///
/// ```rust
/// use tokio::sync::mpsc;
/// let (tx, mut rx) = mpsc::unbounded_channel();
/// new(tx, ProviderType::BiliBili, "your_cookie", 123456).await?;
/// ```
pub async fn new(
provider_type: ProviderType,
identifier: &str,
room_id: u64,
) -> Result<Box<dyn DanmuProvider>, DanmuStreamError> {
match provider_type {
ProviderType::BiliBili => {
let bili = BiliDanmu::new(identifier, room_id).await?;
Ok(Box::new(bili))
}
ProviderType::Douyin => {
let douyin = DouyinDanmu::new(identifier, room_id).await?;
Ok(Box::new(douyin))
}
}
}

View File

@@ -0,0 +1,436 @@
mod dannmu_msg;
mod interact_word;
mod pack;
mod send_gift;
mod stream;
mod super_chat;
use std::{sync::Arc, time::SystemTime};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use log::{error, info};
use pct_str::{PctString, URIReserved};
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{mpsc, RwLock},
time::{sleep, Duration},
};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use crate::{
http_client::ApiClient,
provider::{DanmuMessageType, DanmuProvider},
DanmuStreamError,
};
type WsReadType = futures_util::stream::SplitStream<
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
>;
type WsWriteType = futures_util::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
Message,
>;
pub struct BiliDanmu {
client: ApiClient,
room_id: u64,
user_id: u64,
stop: Arc<RwLock<bool>>,
write: Arc<RwLock<Option<WsWriteType>>>,
}
#[async_trait]
impl DanmuProvider for BiliDanmu {
async fn new(cookie: &str, room_id: u64) -> Result<Self, DanmuStreamError> {
// find DedeUserID=<user_id> in cookie str
let user_id = BiliDanmu::parse_user_id(cookie)?;
let client = ApiClient::new(cookie);
Ok(Self {
client,
user_id,
room_id,
stop: Arc::new(RwLock::new(false)),
write: Arc::new(RwLock::new(None)),
})
}
async fn start(
&self,
tx: mpsc::UnboundedSender<DanmuMessageType>,
) -> Result<(), DanmuStreamError> {
let mut retry_count = 0;
const MAX_RETRIES: u32 = 5;
const RETRY_DELAY: Duration = Duration::from_secs(5);
info!(
"Bilibili WebSocket connection started, room_id: {}",
self.room_id
);
loop {
if *self.stop.read().await {
break;
}
match self.connect_and_handle(tx.clone()).await {
Ok(_) => {
info!("Bilibili WebSocket connection closed normally");
break;
}
Err(e) => {
error!("Bilibili WebSocket connection error: {}", e);
retry_count += 1;
if retry_count >= MAX_RETRIES {
return Err(DanmuStreamError::WebsocketError {
err: format!("Failed to connect after {} retries", MAX_RETRIES),
});
}
info!(
"Retrying connection in {} seconds... (Attempt {}/{})",
RETRY_DELAY.as_secs(),
retry_count,
MAX_RETRIES
);
tokio::time::sleep(RETRY_DELAY).await;
}
}
}
Ok(())
}
async fn stop(&self) -> Result<(), DanmuStreamError> {
*self.stop.write().await = true;
if let Some(mut write) = self.write.write().await.take() {
if let Err(e) = write.close().await {
error!("Failed to close WebSocket connection: {}", e);
}
}
Ok(())
}
}
impl BiliDanmu {
async fn connect_and_handle(
&self,
tx: mpsc::UnboundedSender<DanmuMessageType>,
) -> Result<(), DanmuStreamError> {
let wbi_key = self.get_wbi_key().await?;
let danmu_info = self.get_danmu_info(&wbi_key, self.room_id).await?;
let ws_hosts = danmu_info.data.host_list.clone();
let mut conn = None;
// try to connect to ws_hsots, once success, send the token to the tx
for i in ws_hosts {
let host = format!("wss://{}/sub", i.host);
match connect_async(&host).await {
Ok((c, _)) => {
conn = Some(c);
break;
}
Err(e) => {
eprintln!(
"Connect ws host: {} has error, trying next host ...\n{:?}\n{:?}",
host, i, e
);
}
}
}
let conn = conn.ok_or(DanmuStreamError::WebsocketError {
err: "Failed to connect to ws host".into(),
})?;
let (write, read) = conn.split();
*self.write.write().await = Some(write);
let json = serde_json::to_string(&WsSend {
roomid: self.room_id,
key: danmu_info.data.token,
uid: self.user_id,
protover: 3,
platform: "web".to_string(),
t: 2,
})
.map_err(|e| DanmuStreamError::WebsocketError { err: e.to_string() })?;
let json = pack::encode(&json, 7);
if let Some(write) = self.write.write().await.as_mut() {
write
.send(Message::binary(json))
.await
.map_err(|e| DanmuStreamError::WebsocketError { err: e.to_string() })?;
}
tokio::select! {
v = BiliDanmu::send_heartbeat_packets(Arc::clone(&self.write)) => v,
v = BiliDanmu::recv(read, tx, Arc::clone(&self.stop)) => v
}?;
Ok(())
}
async fn send_heartbeat_packets(
write: Arc<RwLock<Option<WsWriteType>>>,
) -> Result<(), DanmuStreamError> {
loop {
if let Some(write) = write.write().await.as_mut() {
write
.send(Message::binary(pack::encode("", 2)))
.await
.map_err(|e| DanmuStreamError::WebsocketError { err: e.to_string() })?;
}
sleep(Duration::from_secs(30)).await;
}
}
async fn recv(
mut read: WsReadType,
tx: mpsc::UnboundedSender<DanmuMessageType>,
stop: Arc<RwLock<bool>>,
) -> Result<(), DanmuStreamError> {
while let Ok(Some(msg)) = read.try_next().await {
if *stop.read().await {
log::info!("Stopping bilibili danmu stream");
break;
}
let data = msg.into_data();
if !data.is_empty() {
let s = pack::build_pack(&data);
if let Ok(msgs) = s {
for i in msgs {
let ws = stream::WsStreamCtx::new(&i);
if let Ok(ws) = ws {
match ws.match_msg() {
Ok(v) => {
tx.send(v).map_err(|e| DanmuStreamError::WebsocketError {
err: e.to_string(),
})?;
}
Err(e) => {
log::trace!(
"This message parsing is not yet supported:\nMessage: {i}\nErr: {e:#?}"
);
}
}
} else {
log::error!("{}", ws.unwrap_err());
}
}
}
}
}
Ok(())
}
async fn get_danmu_info(
&self,
wbi_key: &str,
room_id: u64,
) -> Result<DanmuInfo, DanmuStreamError> {
let room_id = self.get_real_room(wbi_key, room_id).await?;
let params = self
.get_sign(
wbi_key,
serde_json::json!({
"id": room_id,
"type": 0,
}),
)
.await?;
let resp = self
.client
.get(
&format!(
"https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo?{}",
params
),
None,
)
.await?
.json::<DanmuInfo>()
.await?;
Ok(resp)
}
async fn get_real_room(&self, wbi_key: &str, room_id: u64) -> Result<u64, DanmuStreamError> {
let params = self
.get_sign(
wbi_key,
serde_json::json!({
"id": room_id,
"from": "room",
}),
)
.await?;
let resp = self
.client
.get(
&format!(
"https://api.live.bilibili.com/room/v1/Room/room_init?{}",
params
),
None,
)
.await?
.json::<RoomInit>()
.await?
.data
.room_id;
Ok(resp)
}
fn parse_user_id(cookie: &str) -> Result<u64, DanmuStreamError> {
let mut user_id = None;
// find DedeUserID=<user_id> in cookie str
let re = Regex::new(r"DedeUserID=(\d+)").unwrap();
if let Some(captures) = re.captures(cookie) {
if let Some(user) = captures.get(1) {
user_id = Some(user.as_str().parse::<u64>().unwrap());
}
}
if let Some(user_id) = user_id {
Ok(user_id)
} else {
Err(DanmuStreamError::InvalidIdentifier {
err: format!("Failed to find user_id in cookie: {cookie}"),
})
}
}
async fn get_wbi_key(&self) -> Result<String, DanmuStreamError> {
let nav_info: serde_json::Value = self
.client
.get("https://api.bilibili.com/x/web-interface/nav", None)
.await?
.json()
.await?;
let re = Regex::new(r"wbi/(.*).png").unwrap();
let img = re
.captures(nav_info["data"]["wbi_img"]["img_url"].as_str().unwrap())
.unwrap()
.get(1)
.unwrap()
.as_str();
let sub = re
.captures(nav_info["data"]["wbi_img"]["sub_url"].as_str().unwrap())
.unwrap()
.get(1)
.unwrap()
.as_str();
let raw_string = format!("{}{}", img, sub);
Ok(raw_string)
}
pub async fn get_sign(
&self,
wbi_key: &str,
mut parameters: serde_json::Value,
) -> Result<String, DanmuStreamError> {
let table = vec![
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49, 33, 9, 42,
19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60,
51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52,
];
let raw_string = wbi_key;
let mut encoded = Vec::new();
table.into_iter().for_each(|x| {
if x < raw_string.len() {
encoded.push(raw_string.as_bytes()[x]);
}
});
// only keep 32 bytes of encoded
encoded = encoded[0..32].to_vec();
let encoded = String::from_utf8(encoded).unwrap();
// Timestamp in seconds
let wts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
parameters
.as_object_mut()
.unwrap()
.insert("wts".to_owned(), serde_json::Value::String(wts.to_string()));
// Get all keys from parameters into vec
let mut keys = parameters
.as_object()
.unwrap()
.keys()
.map(|x| x.to_owned())
.collect::<Vec<String>>();
// sort keys
keys.sort();
let mut params = String::new();
keys.iter().for_each(|x| {
params.push_str(x);
params.push('=');
// Convert value to string based on its type
let value = match parameters.get(x).unwrap() {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
_ => "".to_string(),
};
// Value filters !'()* characters
let value = value.replace(['!', '\'', '(', ')', '*'], "");
let value = PctString::encode(value.chars(), URIReserved);
params.push_str(value.as_str());
// add & if not last
if x != keys.last().unwrap() {
params.push('&');
}
});
// md5 params+encoded
let w_rid = md5::compute(params.to_string() + encoded.as_str());
let params = params + format!("&w_rid={:x}", w_rid).as_str();
Ok(params)
}
}
#[derive(Serialize)]
struct WsSend {
uid: u64,
roomid: u64,
key: String,
protover: u32,
platform: String,
#[serde(rename = "type")]
t: u32,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DanmuInfo {
pub data: DanmuInfoData,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DanmuInfoData {
pub token: String,
pub host_list: Vec<WsHost>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct WsHost {
pub host: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct RoomInit {
data: RoomInitData,
}
#[derive(Debug, Deserialize, Clone)]
pub struct RoomInitData {
room_id: u64,
}

View File

@@ -0,0 +1,88 @@
use serde::Deserialize;
use crate::{provider::bilibili::stream::WsStreamCtx, DanmuStreamError};
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct BiliDanmuMessage {
pub uid: u64,
pub username: String,
pub msg: String,
pub fan: Option<String>,
pub fan_level: Option<u64>,
pub timestamp: i64,
}
impl BiliDanmuMessage {
pub fn new_from_ctx(ctx: &WsStreamCtx) -> Result<Self, DanmuStreamError> {
let info = ctx
.info
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "info is None".to_string(),
})?;
let array_2 = info
.get(2)
.and_then(|x| x.as_array())
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "array_2 is None".to_string(),
})?
.to_owned();
let uid = array_2.first().and_then(|x| x.as_u64()).ok_or_else(|| {
DanmuStreamError::MessageParseError {
err: "uid is None".to_string(),
}
})?;
let username = array_2
.get(1)
.and_then(|x| x.as_str())
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "username is None".to_string(),
})?
.to_string();
let msg = info
.get(1)
.and_then(|x| x.as_str())
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "msg is None".to_string(),
})?
.to_string();
let array_3 = info
.get(3)
.and_then(|x| x.as_array())
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "array_3 is None".to_string(),
})?
.to_owned();
let fan = array_3
.get(1)
.and_then(|x| x.as_str())
.map(|x| x.to_owned());
let fan_level = array_3.first().and_then(|x| x.as_u64());
let timestamp = info
.first()
.and_then(|x| x.as_array())
.and_then(|x| x.get(4))
.and_then(|x| x.as_i64())
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "timestamp is None".to_string(),
})?;
Ok(Self {
uid,
username,
msg,
fan,
fan_level,
timestamp,
})
}
}

View File

@@ -0,0 +1,67 @@
use crate::{provider::bilibili::stream::WsStreamCtx, DanmuStreamError};
#[derive(Debug)]
#[allow(dead_code)]
pub struct InteractWord {
pub uid: u64,
pub uname: String,
pub fan: Option<String>,
pub fan_level: Option<u32>,
}
#[allow(dead_code)]
impl InteractWord {
pub fn new_from_ctx(ctx: &WsStreamCtx) -> Result<Self, DanmuStreamError> {
let data = ctx
.data
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "data is None".to_string(),
})?;
let uname = data
.uname
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "uname is None".to_string(),
})?
.to_string();
let uid = data
.uid
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "uid is None".to_string(),
})?
.as_u64()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "uid is None".to_string(),
})?;
let fan = data
.fans_medal
.as_ref()
.and_then(|x| x.medal_name.to_owned());
let fan = if fan == Some("".to_string()) {
None
} else {
fan
};
let fan_level = data.fans_medal.as_ref().and_then(|x| x.medal_level);
let fan_level = if fan_level == Some(0) {
None
} else {
fan_level
};
Ok(Self {
uid,
uname,
fan,
fan_level,
})
}
}

View File

@@ -0,0 +1,161 @@
// This file is copied from https://github.com/eatradish/felgens/blob/master/src/pack.rs
use std::io::Read;
use flate2::read::ZlibDecoder;
use scroll::Pread;
use scroll_derive::Pread;
use crate::DanmuStreamError;
#[derive(Debug, Pread, Clone)]
struct BilibiliPackHeader {
pack_len: u32,
_header_len: u16,
ver: u16,
_op: u32,
_seq: u32,
}
#[derive(Debug, Pread)]
struct PackHotCount {
count: u32,
}
type BilibiliPackCtx<'a> = (BilibiliPackHeader, &'a [u8]);
fn pack(buffer: &[u8]) -> Result<BilibiliPackCtx, DanmuStreamError> {
let data = buffer
.pread_with(0, scroll::BE)
.map_err(|e: scroll::Error| DanmuStreamError::PackError { err: e.to_string() })?;
let buf = &buffer[16..];
Ok((data, buf))
}
fn write_int(buffer: &[u8], start: usize, val: u32) -> Vec<u8> {
let val_bytes = val.to_be_bytes();
let mut buf = buffer.to_vec();
for (i, c) in val_bytes.iter().enumerate() {
buf[start + i] = *c;
}
buf
}
pub fn encode(s: &str, op: u8) -> Vec<u8> {
let data = s.as_bytes();
let packet_len = 16 + data.len();
let header = vec![0, 0, 0, 0, 0, 16, 0, 1, 0, 0, 0, op, 0, 0, 0, 1];
let header = write_int(&header, 0, packet_len as u32);
[&header, data].concat()
}
pub fn build_pack(buf: &[u8]) -> Result<Vec<String>, DanmuStreamError> {
let ctx = pack(buf)?;
let msgs = decode(ctx)?;
Ok(msgs)
}
fn get_hot_count(body: &[u8]) -> Result<u32, DanmuStreamError> {
let count = body
.pread_with::<PackHotCount>(0, scroll::BE)
.map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?
.count;
Ok(count)
}
fn zlib_decode(body: &[u8]) -> Result<(BilibiliPackHeader, Vec<u8>), DanmuStreamError> {
let mut buf = vec![];
let mut z = ZlibDecoder::new(body);
z.read_to_end(&mut buf)
.map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?;
let ctx = pack(&buf)?;
let header = ctx.0;
let buf = ctx.1.to_vec();
Ok((header, buf))
}
fn decode(ctx: BilibiliPackCtx) -> Result<Vec<String>, DanmuStreamError> {
let (mut header, body) = ctx;
let mut buf = body.to_vec();
loop {
(header, buf) = match header.ver {
2 => zlib_decode(&buf)?,
3 => brotli_decode(&buf)?,
0 | 1 => break,
_ => break,
}
}
let msgs = match header.ver {
0 => split_msgs(buf, header)?,
1 => vec![format!("{{\"count\": {}}}", get_hot_count(&buf)?)],
x => return Err(DanmuStreamError::UnsupportProto { proto: x }),
};
Ok(msgs)
}
fn split_msgs(buf: Vec<u8>, header: BilibiliPackHeader) -> Result<Vec<String>, DanmuStreamError> {
let mut buf = buf;
let mut header = header;
let mut msgs = vec![];
let mut offset = 0;
let buf_len = buf.len();
msgs.push(
std::str::from_utf8(&buf[..(header.pack_len - 16) as usize])
.map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?
.to_string(),
);
buf = buf[(header.pack_len - 16) as usize..].to_vec();
offset += header.pack_len - 16;
while offset != buf_len as u32 {
let ctx = pack(&buf).map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?;
header = ctx.0;
buf = ctx.1.to_vec();
msgs.push(
std::str::from_utf8(&buf[..(header.pack_len - 16) as usize])
.map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?
.to_string(),
);
buf = buf[(header.pack_len - 16) as usize..].to_vec();
offset += header.pack_len;
}
Ok(msgs)
}
fn brotli_decode(body: &[u8]) -> Result<(BilibiliPackHeader, Vec<u8>), DanmuStreamError> {
let mut reader = brotli::Decompressor::new(body, 4096);
let mut buf = Vec::new();
reader
.read_to_end(&mut buf)
.map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?;
let ctx = pack(&buf).map_err(|e| DanmuStreamError::PackError { err: e.to_string() })?;
let header = ctx.0;
let buf = ctx.1.to_vec();
Ok((header, buf))
}

View File

@@ -0,0 +1,115 @@
use serde::Deserialize;
use crate::{provider::bilibili::stream::WsStreamCtx, DanmuStreamError};
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct SendGift {
pub action: String,
pub gift_name: String,
pub num: u64,
pub uname: String,
pub uid: u64,
pub medal_name: Option<String>,
pub medal_level: Option<u32>,
pub price: u32,
}
#[allow(dead_code)]
impl SendGift {
pub fn new_from_ctx(ctx: &WsStreamCtx) -> Result<Self, DanmuStreamError> {
let data = ctx
.data
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "data is None".to_string(),
})?;
let action = data
.action
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "action is None".to_string(),
})?
.to_owned();
let combo_send = data.combo_send.clone();
let gift_name = if let Some(gift) = data.gift_name.as_ref() {
gift.to_owned()
} else if let Some(gift) = combo_send.clone().and_then(|x| x.gift_name) {
gift
} else {
return Err(DanmuStreamError::MessageParseError {
err: "gift_name is None".to_string(),
});
};
let num = if let Some(num) = combo_send.clone().and_then(|x| x.combo_num) {
num
} else if let Some(num) = data.num {
num
} else if let Some(num) = combo_send.and_then(|x| x.gift_num) {
num
} else {
return Err(DanmuStreamError::MessageParseError {
err: "num is None".to_string(),
});
};
let uname = data
.uname
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "uname is None".to_string(),
})?
.to_owned();
let uid = data
.uid
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "uid is None".to_string(),
})?
.as_u64()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "uid is None".to_string(),
})?;
let medal_name = data
.medal_info
.as_ref()
.and_then(|x| x.medal_name.to_owned());
let medal_level = data.medal_info.as_ref().and_then(|x| x.medal_level);
let medal_name = if medal_name == Some("".to_string()) {
None
} else {
medal_name
};
let medal_level = if medal_level == Some(0) {
None
} else {
medal_level
};
let price = data
.price
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "price is None".to_string(),
})?;
Ok(Self {
action,
gift_name,
num,
uname,
uid,
medal_name,
medal_level,
price,
})
}
}

View File

@@ -0,0 +1,97 @@
use serde::Deserialize;
use serde_json::Value;
use crate::{
provider::{bilibili::dannmu_msg::BiliDanmuMessage, DanmuMessageType},
DanmuStreamError, DanmuMessage,
};
#[derive(Debug, Deserialize, Clone)]
pub struct WsStreamCtx {
pub cmd: Option<String>,
pub info: Option<Vec<Value>>,
pub data: Option<WsStreamCtxData>,
#[serde(flatten)]
_v: Value,
}
#[derive(Debug, Deserialize, Clone)]
#[allow(dead_code)]
pub struct WsStreamCtxData {
pub message: Option<String>,
pub price: Option<u32>,
pub start_time: Option<u64>,
pub time: Option<u32>,
pub uid: Option<Value>,
pub user_info: Option<WsStreamCtxDataUser>,
pub medal_info: Option<WsStreamCtxDataMedalInfo>,
pub uname: Option<String>,
pub fans_medal: Option<WsStreamCtxDataMedalInfo>,
pub action: Option<String>,
#[serde(rename = "giftName")]
pub gift_name: Option<String>,
pub num: Option<u64>,
pub combo_num: Option<u64>,
pub gift_num: Option<u64>,
pub combo_send: Box<Option<WsStreamCtxData>>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct WsStreamCtxDataMedalInfo {
pub medal_name: Option<String>,
pub medal_level: Option<u32>,
}
#[derive(Debug, Deserialize, Clone)]
#[allow(dead_code)]
pub struct WsStreamCtxDataUser {
pub face: String,
pub uname: String,
}
impl WsStreamCtx {
pub fn new(s: &str) -> Result<Self, DanmuStreamError> {
serde_json::from_str(s).map_err(|_| DanmuStreamError::MessageParseError {
err: "Failed to parse message".to_string(),
})
}
pub fn match_msg(&self) -> Result<DanmuMessageType, DanmuStreamError> {
let cmd = self.handle_cmd();
let danmu_msg = match cmd {
Some(c) if c.contains("DANMU_MSG") => Some(BiliDanmuMessage::new_from_ctx(self)?),
_ => None,
};
if let Some(danmu_msg) = danmu_msg {
Ok(DanmuMessageType::DanmuMessage(DanmuMessage {
room_id: 0,
user_id: danmu_msg.uid,
user_name: danmu_msg.username,
message: danmu_msg.msg,
color: 0,
timestamp: danmu_msg.timestamp,
}))
} else {
Err(DanmuStreamError::MessageParseError {
err: "Unknown message".to_string(),
})
}
}
fn handle_cmd(&self) -> Option<&str> {
// handle DANMU_MSG:4:0:2:2:2:0
let cmd = if let Some(c) = self.cmd.as_deref() {
if c.starts_with("DANMU_MSG") {
Some("DANMU_MSG")
} else {
Some(c)
}
} else {
None
};
cmd
}
}

View File

@@ -0,0 +1,93 @@
use serde::Deserialize;
use crate::{provider::bilibili::stream::WsStreamCtx, DanmuStreamError};
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct SuperChatMessage {
pub uname: String,
pub uid: u64,
pub face: String,
pub price: u32,
pub start_time: u64,
pub time: u32,
pub msg: String,
pub medal_name: Option<String>,
pub medal_level: Option<u32>,
}
#[allow(dead_code)]
impl SuperChatMessage {
pub fn new_from_ctx(ctx: &WsStreamCtx) -> Result<Self, DanmuStreamError> {
let data = ctx
.data
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "data is None".to_string(),
})?;
let user_info =
data.user_info
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "user_info is None".to_string(),
})?;
let uname = user_info.uname.to_owned();
let uid = data.uid.as_ref().and_then(|x| x.as_u64()).ok_or_else(|| {
DanmuStreamError::MessageParseError {
err: "uid is None".to_string(),
}
})?;
let face = user_info.face.to_owned();
let price = data
.price
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "price is None".to_string(),
})?;
let start_time = data
.start_time
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "start_time is None".to_string(),
})?;
let time = data
.time
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "time is None".to_string(),
})?;
let msg = data
.message
.as_ref()
.ok_or_else(|| DanmuStreamError::MessageParseError {
err: "message is None".to_string(),
})?
.to_owned();
let medal = data
.medal_info
.as_ref()
.map(|x| (x.medal_name.to_owned(), x.medal_level.to_owned()));
let medal_name = medal.as_ref().and_then(|(name, _)| name.to_owned());
let medal_level = medal.and_then(|(_, level)| level);
Ok(Self {
uname,
uid,
face,
price,
start_time,
time,
msg,
medal_name,
medal_level,
})
}
}

View File

@@ -0,0 +1,463 @@
use crate::{provider::DanmuProvider, DanmuStreamError, DanmuMessage, DanmuMessageType};
use async_trait::async_trait;
use chrono;
use deno_core::v8;
use deno_core::JsRuntime;
use deno_core::RuntimeOptions;
use flate2::read::GzDecoder;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use log::debug;
use log::{error, info};
use prost::bytes::Bytes;
use prost::Message;
use std::io::Read;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio_tungstenite::{
connect_async, tungstenite::Message as WsMessage, MaybeTlsStream, WebSocketStream,
};
mod messages;
use messages::*;
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36";
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
type WsReadType = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
type WsWriteType =
futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>;
pub struct DouyinDanmu {
room_id: u64,
cookie: String,
stop: Arc<RwLock<bool>>,
write: Arc<RwLock<Option<WsWriteType>>>,
}
impl DouyinDanmu {
async fn connect_and_handle(
&self,
tx: mpsc::UnboundedSender<DanmuMessageType>,
) -> Result<(), DanmuStreamError> {
let url = self.get_wss_url().await?;
let request = tokio_tungstenite::tungstenite::http::Request::builder()
.uri(url)
.header(
tokio_tungstenite::tungstenite::http::header::COOKIE,
self.cookie.as_str(),
)
.header(
tokio_tungstenite::tungstenite::http::header::REFERER,
"https://live.douyin.com/",
)
.header(
tokio_tungstenite::tungstenite::http::header::USER_AGENT,
USER_AGENT,
)
.header(
tokio_tungstenite::tungstenite::http::header::HOST,
"webcast5-ws-web-hl.douyin.com",
)
.header(
tokio_tungstenite::tungstenite::http::header::UPGRADE,
"websocket",
)
.header(
tokio_tungstenite::tungstenite::http::header::CONNECTION,
"Upgrade",
)
.header(
tokio_tungstenite::tungstenite::http::header::SEC_WEBSOCKET_VERSION,
"13",
)
.header(
tokio_tungstenite::tungstenite::http::header::SEC_WEBSOCKET_EXTENSIONS,
"permessage-deflate; client_max_window_bits",
)
.header(
tokio_tungstenite::tungstenite::http::header::SEC_WEBSOCKET_KEY,
"V1Yza5x1zcfkembl6u/0Pg==",
)
.body(())
.unwrap();
let (ws_stream, response) =
connect_async(request)
.await
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to connect to douyin websocket: {}", e),
})?;
// Log the response status for debugging
info!("WebSocket connection response: {:?}", response.status());
let (write, read) = ws_stream.split();
*self.write.write().await = Some(write);
self.handle_connection(read, tx).await
}
async fn get_wss_url(&self) -> Result<String, DanmuStreamError> {
// Create a new V8 runtime
let mut runtime = JsRuntime::new(RuntimeOptions::default());
// Add global CryptoJS object
let crypto_js = include_str!("douyin/crypto-js.min.js");
runtime
.execute_script(
"<crypto-js.min.js>",
deno_core::FastString::Static(crypto_js),
)
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to execute crypto-js: {}", e),
})?;
// Load and execute the sign.js file
let js_code = include_str!("douyin/webmssdk.js");
runtime
.execute_script("<sign.js>", deno_core::FastString::Static(js_code))
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to execute JavaScript: {}", e),
})?;
// Call the get_wss_url function
let sign_call = format!("get_wss_url(\"{}\")", self.room_id);
let result = runtime
.execute_script(
"<sign_call>",
deno_core::FastString::Owned(sign_call.into_boxed_str()),
)
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to execute JavaScript: {}", e),
})?;
// Get the result from the V8 runtime
let scope = &mut runtime.handle_scope();
let local = v8::Local::new(scope, result);
let url = local.to_string(scope).unwrap().to_rust_string_lossy(scope);
debug!("Douyin wss url: {}", url);
Ok(url)
}
async fn handle_connection(
&self,
mut read: WsReadType,
tx: mpsc::UnboundedSender<DanmuMessageType>,
) -> Result<(), DanmuStreamError> {
// Start heartbeat task with error handling
let (tx_write, mut _rx_write) = mpsc::channel(32);
let tx_write_clone = tx_write.clone();
let stop = Arc::clone(&self.stop);
let heartbeat_handle = tokio::spawn(async move {
let mut last_heartbeat = SystemTime::now();
let mut consecutive_failures = 0;
const MAX_FAILURES: u32 = 3;
loop {
if *stop.read().await {
log::info!("Stopping douyin danmu stream");
break;
}
tokio::time::sleep(HEARTBEAT_INTERVAL).await;
match Self::send_heartbeat(&tx_write_clone).await {
Ok(_) => {
last_heartbeat = SystemTime::now();
consecutive_failures = 0;
}
Err(e) => {
error!("Failed to send heartbeat: {}", e);
consecutive_failures += 1;
if consecutive_failures >= MAX_FAILURES {
error!("Too many consecutive heartbeat failures, closing connection");
break;
}
// Check if we've exceeded the maximum time without a successful heartbeat
if let Ok(duration) = last_heartbeat.elapsed() {
if duration > HEARTBEAT_INTERVAL * 2 {
error!("No successful heartbeat for too long, closing connection");
break;
}
}
}
}
}
});
// Main message handling loop
let room_id = self.room_id;
let stop = Arc::clone(&self.stop);
let write = Arc::clone(&self.write);
let message_handle = tokio::spawn(async move {
while let Some(msg) =
read.try_next()
.await
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to read message: {}", e),
})?
{
if *stop.read().await {
log::info!("Stopping douyin danmu stream");
break;
}
match msg {
WsMessage::Binary(data) => {
if let Ok(Some(ack)) = handle_binary_message(&data, &tx, room_id).await {
if let Some(write) = write.write().await.as_mut() {
if let Err(e) =
write.send(WsMessage::Binary(ack.encode_to_vec())).await
{
error!("Failed to send ack: {}", e);
}
}
}
}
WsMessage::Close(_) => {
info!("WebSocket connection closed");
break;
}
WsMessage::Ping(data) => {
// Respond to ping with pong
if let Err(e) = tx_write.send(WsMessage::Pong(data)).await {
error!("Failed to send pong: {}", e);
break;
}
}
_ => {}
}
}
Ok::<(), DanmuStreamError>(())
});
// Wait for either the heartbeat or message handling to complete
tokio::select! {
result = heartbeat_handle => {
if let Err(e) = result {
error!("Heartbeat task failed: {}", e);
}
}
result = message_handle => {
if let Err(e) = result {
error!("Message handling task failed: {}", e);
}
}
}
Ok(())
}
async fn send_heartbeat(tx: &mpsc::Sender<WsMessage>) -> Result<(), DanmuStreamError> {
// heartbeat message: 3A 02 68 62
tx.send(WsMessage::Binary(vec![0x3A, 0x02, 0x68, 0x62]))
.await
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to send heartbeat message: {}", e),
})?;
Ok(())
}
}
async fn handle_binary_message(
data: &[u8],
tx: &mpsc::UnboundedSender<DanmuMessageType>,
room_id: u64,
) -> Result<Option<PushFrame>, DanmuStreamError> {
// First decode the PushFrame
let push_frame = PushFrame::decode(Bytes::from(data.to_vec())).map_err(|e| {
DanmuStreamError::WebsocketError {
err: format!("Failed to decode PushFrame: {}", e),
}
})?;
// Decompress the payload
let mut decoder = GzDecoder::new(push_frame.payload.as_slice());
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to decompress payload: {}", e),
})?;
// Decode the Response from decompressed payload
let response = Response::decode(Bytes::from(decompressed)).map_err(|e| {
DanmuStreamError::WebsocketError {
err: format!("Failed to decode Response: {}", e),
}
})?;
// if payload_package.needAck:
// obj = PushFrame()
// obj.payloadType = 'ack'
// obj.logId = log_id
// obj.payloadType = payload_package.internalExt
// ack = obj.SerializeToString()
let mut ack = None;
if response.need_ack {
let ack_msg = PushFrame {
payload_type: "ack".to_string(),
log_id: push_frame.log_id,
payload_encoding: "".to_string(),
payload: vec![],
seq_id: 0,
service: 0,
method: 0,
headers_list: vec![],
};
debug!("Need to respond ack: {:?}", ack_msg);
ack = Some(ack_msg);
}
for message in response.messages_list {
match message.method.as_str() {
"WebcastChatMessage" => {
let chat_msg =
DouyinChatMessage::decode(message.payload.as_slice()).map_err(|e| {
DanmuStreamError::WebsocketError {
err: format!("Failed to decode chat message: {}", e),
}
})?;
if let Some(user) = chat_msg.user {
let danmu_msg = DanmuMessage {
room_id,
user_id: user.id,
user_name: user.nick_name,
message: chat_msg.content,
color: 0xffffff,
timestamp: chrono::Utc::now().timestamp(),
};
debug!("Received danmu message: {:?}", danmu_msg);
tx.send(DanmuMessageType::DanmuMessage(danmu_msg))
.map_err(|e| DanmuStreamError::WebsocketError {
err: format!("Failed to send message to channel: {}", e),
})?;
}
}
"WebcastGiftMessage" => {
let gift_msg = GiftMessage::decode(message.payload.as_slice()).map_err(|e| {
DanmuStreamError::WebsocketError {
err: format!("Failed to decode gift message: {}", e),
}
})?;
if let Some(user) = gift_msg.user {
if let Some(gift) = gift_msg.gift {
log::debug!("Received gift: {} from user: {}", gift.name, user.nick_name);
}
}
}
"WebcastLikeMessage" => {
let like_msg = LikeMessage::decode(message.payload.as_slice()).map_err(|e| {
DanmuStreamError::WebsocketError {
err: format!("Failed to decode like message: {}", e),
}
})?;
if let Some(user) = like_msg.user {
log::debug!(
"Received {} likes from user: {}",
like_msg.count,
user.nick_name
);
}
}
"WebcastMemberMessage" => {
let member_msg =
MemberMessage::decode(message.payload.as_slice()).map_err(|e| {
DanmuStreamError::WebsocketError {
err: format!("Failed to decode member message: {}", e),
}
})?;
if let Some(user) = member_msg.user {
log::debug!(
"Member joined: {} (Action: {})",
user.nick_name,
member_msg.action_description
);
}
}
_ => {
debug!("Unknown message: {:?}", message);
}
}
}
Ok(ack)
}
#[async_trait]
impl DanmuProvider for DouyinDanmu {
async fn new(identifier: &str, room_id: u64) -> Result<Self, DanmuStreamError> {
Ok(Self {
room_id,
cookie: identifier.to_string(),
stop: Arc::new(RwLock::new(false)),
write: Arc::new(RwLock::new(None)),
})
}
async fn start(
&self,
tx: mpsc::UnboundedSender<DanmuMessageType>,
) -> Result<(), DanmuStreamError> {
let mut retry_count = 0;
const MAX_RETRIES: u32 = 5;
const RETRY_DELAY: Duration = Duration::from_secs(5);
info!(
"Douyin WebSocket connection started, room_id: {}",
self.room_id
);
loop {
if *self.stop.read().await {
break;
}
match self.connect_and_handle(tx.clone()).await {
Ok(_) => {
info!("Douyin WebSocket connection closed normally");
break;
}
Err(e) => {
error!("Douyin WebSocket connection error: {}", e);
retry_count += 1;
if retry_count >= MAX_RETRIES {
return Err(DanmuStreamError::WebsocketError {
err: format!("Failed to connect after {} retries", MAX_RETRIES),
});
}
info!(
"Retrying connection in {} seconds... (Attempt {}/{})",
RETRY_DELAY.as_secs(),
retry_count,
MAX_RETRIES
);
tokio::time::sleep(RETRY_DELAY).await;
}
}
}
Ok(())
}
async fn stop(&self) -> Result<(), DanmuStreamError> {
*self.stop.write().await = true;
if let Some(mut write) = self.write.write().await.take() {
if let Err(e) = write.close().await {
error!("Failed to close WebSocket connection: {}", e);
}
}
Ok(())
}
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,861 @@
use prost::Message;
use std::collections::HashMap;
// message Response {
// repeated Message messagesList = 1;
// string cursor = 2;
// uint64 fetchInterval = 3;
// uint64 now = 4;
// string internalExt = 5;
// uint32 fetchType = 6;
// map<string, string> routeParams = 7;
// uint64 heartbeatDuration = 8;
// bool needAck = 9;
// string pushServer = 10;
// string liveCursor = 11;
// bool historyNoMore = 12;
// }
#[derive(Message)]
pub struct Response {
#[prost(message, repeated, tag = "1")]
pub messages_list: Vec<CommonMessage>,
#[prost(string, tag = "2")]
pub cursor: String,
#[prost(uint64, tag = "3")]
pub fetch_interval: u64,
#[prost(uint64, tag = "4")]
pub now: u64,
#[prost(string, tag = "5")]
pub internal_ext: String,
#[prost(uint32, tag = "6")]
pub fetch_type: u32,
#[prost(map = "string, string", tag = "7")]
pub route_params: HashMap<String, String>,
#[prost(uint64, tag = "8")]
pub heartbeat_duration: u64,
#[prost(bool, tag = "9")]
pub need_ack: bool,
#[prost(string, tag = "10")]
pub push_server: String,
#[prost(string, tag = "11")]
pub live_cursor: String,
#[prost(bool, tag = "12")]
pub history_no_more: bool,
}
#[derive(Message)]
pub struct CommonMessage {
#[prost(string, tag = "1")]
pub method: String,
#[prost(bytes, tag = "2")]
pub payload: Vec<u8>,
#[prost(int64, tag = "3")]
pub msg_id: i64,
#[prost(int32, tag = "4")]
pub msg_type: i32,
#[prost(int64, tag = "5")]
pub offset: i64,
#[prost(bool, tag = "6")]
pub need_wrds_store: bool,
#[prost(int64, tag = "7")]
pub wrds_version: i64,
#[prost(string, tag = "8")]
pub wrds_sub_key: String,
}
#[derive(Message)]
pub struct Common {
#[prost(string, tag = "1")]
pub method: String,
#[prost(uint64, tag = "2")]
pub msg_id: u64,
#[prost(uint64, tag = "3")]
pub room_id: u64,
#[prost(uint64, tag = "4")]
pub create_time: u64,
#[prost(uint32, tag = "5")]
pub monitor: u32,
#[prost(bool, tag = "6")]
pub is_show_msg: bool,
#[prost(string, tag = "7")]
pub describe: String,
#[prost(uint64, tag = "9")]
pub fold_type: u64,
#[prost(uint64, tag = "10")]
pub anchor_fold_type: u64,
#[prost(uint64, tag = "11")]
pub priority_score: u64,
#[prost(string, tag = "12")]
pub log_id: String,
#[prost(string, tag = "13")]
pub msg_process_filter_k: String,
#[prost(string, tag = "14")]
pub msg_process_filter_v: String,
#[prost(message, optional, tag = "15")]
pub user: Option<User>,
}
#[derive(Message)]
pub struct User {
#[prost(uint64, tag = "1")]
pub id: u64,
#[prost(uint64, tag = "2")]
pub short_id: u64,
#[prost(string, tag = "3")]
pub nick_name: String,
#[prost(uint32, tag = "4")]
pub gender: u32,
#[prost(string, tag = "5")]
pub signature: String,
#[prost(uint32, tag = "6")]
pub level: u32,
#[prost(uint64, tag = "7")]
pub birthday: u64,
#[prost(string, tag = "8")]
pub telephone: String,
#[prost(message, optional, tag = "9")]
pub avatar_thumb: Option<Image>,
#[prost(message, optional, tag = "10")]
pub avatar_medium: Option<Image>,
#[prost(message, optional, tag = "11")]
pub avatar_large: Option<Image>,
#[prost(bool, tag = "12")]
pub verified: bool,
#[prost(uint32, tag = "13")]
pub experience: u32,
#[prost(string, tag = "14")]
pub city: String,
#[prost(int32, tag = "15")]
pub status: i32,
#[prost(uint64, tag = "16")]
pub create_time: u64,
#[prost(uint64, tag = "17")]
pub modify_time: u64,
#[prost(uint32, tag = "18")]
pub secret: u32,
#[prost(string, tag = "19")]
pub share_qrcode_uri: String,
#[prost(uint32, tag = "20")]
pub income_share_percent: u32,
#[prost(message, repeated, tag = "21")]
pub badge_image_list: Vec<Image>,
#[prost(message, optional, tag = "22")]
pub follow_info: Option<FollowInfo>,
#[prost(message, optional, tag = "23")]
pub pay_grade: Option<PayGrade>,
#[prost(message, optional, tag = "24")]
pub fans_club: Option<FansClub>,
#[prost(string, tag = "26")]
pub special_id: String,
#[prost(message, optional, tag = "27")]
pub avatar_border: Option<Image>,
#[prost(message, optional, tag = "28")]
pub medal: Option<Image>,
#[prost(message, repeated, tag = "29")]
pub real_time_icons_list: Vec<Image>,
#[prost(string, tag = "38")]
pub display_id: String,
#[prost(string, tag = "46")]
pub sec_uid: String,
#[prost(uint64, tag = "1022")]
pub fan_ticket_count: u64,
#[prost(string, tag = "1028")]
pub id_str: String,
#[prost(uint32, tag = "1045")]
pub age_range: u32,
}
#[derive(Message, PartialEq)]
pub struct Image {
#[prost(string, repeated, tag = "1")]
pub url_list_list: Vec<String>,
#[prost(string, tag = "2")]
pub uri: String,
#[prost(uint64, tag = "3")]
pub height: u64,
#[prost(uint64, tag = "4")]
pub width: u64,
#[prost(string, tag = "5")]
pub avg_color: String,
#[prost(uint32, tag = "6")]
pub image_type: u32,
#[prost(string, tag = "7")]
pub open_web_url: String,
#[prost(message, optional, tag = "8")]
pub content: Option<ImageContent>,
#[prost(bool, tag = "9")]
pub is_animated: bool,
#[prost(message, optional, tag = "10")]
pub flex_setting_list: Option<NinePatchSetting>,
#[prost(message, optional, tag = "11")]
pub text_setting_list: Option<NinePatchSetting>,
}
#[derive(Message, PartialEq)]
pub struct ImageContent {
#[prost(string, tag = "1")]
pub name: String,
#[prost(string, tag = "2")]
pub font_color: String,
#[prost(uint64, tag = "3")]
pub level: u64,
#[prost(string, tag = "4")]
pub alternative_text: String,
}
#[derive(Message, PartialEq)]
pub struct NinePatchSetting {
#[prost(string, repeated, tag = "1")]
pub setting_list_list: Vec<String>,
}
#[derive(Message)]
pub struct FollowInfo {
#[prost(uint64, tag = "1")]
pub following_count: u64,
#[prost(uint64, tag = "2")]
pub follower_count: u64,
#[prost(uint64, tag = "3")]
pub follow_status: u64,
#[prost(uint64, tag = "4")]
pub push_status: u64,
#[prost(string, tag = "5")]
pub remark_name: String,
#[prost(string, tag = "6")]
pub follower_count_str: String,
#[prost(string, tag = "7")]
pub following_count_str: String,
}
#[derive(Message)]
pub struct PayGrade {
#[prost(int64, tag = "1")]
pub total_diamond_count: i64,
#[prost(message, optional, tag = "2")]
pub diamond_icon: Option<Image>,
#[prost(string, tag = "3")]
pub name: String,
#[prost(message, optional, tag = "4")]
pub icon: Option<Image>,
#[prost(string, tag = "5")]
pub next_name: String,
#[prost(int64, tag = "6")]
pub level: i64,
#[prost(message, optional, tag = "7")]
pub next_icon: Option<Image>,
#[prost(int64, tag = "8")]
pub next_diamond: i64,
#[prost(int64, tag = "9")]
pub now_diamond: i64,
#[prost(int64, tag = "10")]
pub this_grade_min_diamond: i64,
#[prost(int64, tag = "11")]
pub this_grade_max_diamond: i64,
#[prost(int64, tag = "12")]
pub pay_diamond_bak: i64,
#[prost(string, tag = "13")]
pub grade_describe: String,
#[prost(message, repeated, tag = "14")]
pub grade_icon_list: Vec<GradeIcon>,
#[prost(int64, tag = "15")]
pub screen_chat_type: i64,
#[prost(message, optional, tag = "16")]
pub im_icon: Option<Image>,
#[prost(message, optional, tag = "17")]
pub im_icon_with_level: Option<Image>,
#[prost(message, optional, tag = "18")]
pub live_icon: Option<Image>,
#[prost(message, optional, tag = "19")]
pub new_im_icon_with_level: Option<Image>,
#[prost(message, optional, tag = "20")]
pub new_live_icon: Option<Image>,
#[prost(int64, tag = "21")]
pub upgrade_need_consume: i64,
#[prost(string, tag = "22")]
pub next_privileges: String,
#[prost(message, optional, tag = "23")]
pub background: Option<Image>,
#[prost(message, optional, tag = "24")]
pub background_back: Option<Image>,
#[prost(int64, tag = "25")]
pub score: i64,
#[prost(message, optional, tag = "26")]
pub buff_info: Option<GradeBuffInfo>,
}
#[derive(Message)]
pub struct GradeIcon {
#[prost(message, optional, tag = "1")]
pub icon: Option<Image>,
#[prost(int64, tag = "2")]
pub icon_diamond: i64,
#[prost(int64, tag = "3")]
pub level: i64,
#[prost(string, tag = "4")]
pub level_str: String,
}
#[derive(Message)]
pub struct GradeBuffInfo {}
#[derive(Message)]
pub struct FansClub {
#[prost(message, optional, tag = "1")]
pub data: Option<FansClubData>,
#[prost(map = "int32, message", tag = "2")]
pub prefer_data: HashMap<i32, FansClubData>,
}
#[derive(Message, PartialEq)]
pub struct FansClubData {
#[prost(string, tag = "1")]
pub club_name: String,
#[prost(int32, tag = "2")]
pub level: i32,
#[prost(int32, tag = "3")]
pub user_fans_club_status: i32,
#[prost(message, optional, tag = "4")]
pub badge: Option<UserBadge>,
#[prost(int64, repeated, tag = "5")]
pub available_gift_ids: Vec<i64>,
#[prost(int64, tag = "6")]
pub anchor_id: i64,
}
#[derive(Message, PartialEq)]
pub struct UserBadge {
#[prost(map = "int32, message", tag = "1")]
pub icons: HashMap<i32, Image>,
#[prost(string, tag = "2")]
pub title: String,
}
#[derive(Message)]
pub struct PublicAreaCommon {
#[prost(message, optional, tag = "1")]
pub user_label: Option<Image>,
#[prost(uint64, tag = "2")]
pub user_consume_in_room: u64,
#[prost(uint64, tag = "3")]
pub user_send_gift_cnt_in_room: u64,
}
#[derive(Message)]
pub struct LandscapeAreaCommon {
#[prost(bool, tag = "1")]
pub show_head: bool,
#[prost(bool, tag = "2")]
pub show_nickname: bool,
#[prost(bool, tag = "3")]
pub show_font_color: bool,
#[prost(string, repeated, tag = "4")]
pub color_value_list: Vec<String>,
#[prost(enumeration = "CommentTypeTag", repeated, tag = "5")]
pub comment_type_tags_list: Vec<i32>,
}
#[derive(Message)]
pub struct Text {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub default_patter: String,
#[prost(message, optional, tag = "3")]
pub default_format: Option<TextFormat>,
#[prost(message, repeated, tag = "4")]
pub pieces_list: Vec<TextPiece>,
}
#[derive(Message)]
pub struct TextFormat {
#[prost(string, tag = "1")]
pub color: String,
#[prost(bool, tag = "2")]
pub bold: bool,
#[prost(bool, tag = "3")]
pub italic: bool,
#[prost(uint32, tag = "4")]
pub weight: u32,
#[prost(uint32, tag = "5")]
pub italic_angle: u32,
#[prost(uint32, tag = "6")]
pub font_size: u32,
#[prost(bool, tag = "7")]
pub use_heigh_light_color: bool,
#[prost(bool, tag = "8")]
pub use_remote_clor: bool,
}
#[derive(Message)]
pub struct TextPiece {
#[prost(bool, tag = "1")]
pub r#type: bool,
#[prost(message, optional, tag = "2")]
pub format: Option<TextFormat>,
#[prost(string, tag = "3")]
pub string_value: String,
#[prost(message, optional, tag = "4")]
pub user_value: Option<TextPieceUser>,
#[prost(message, optional, tag = "5")]
pub gift_value: Option<TextPieceGift>,
#[prost(message, optional, tag = "6")]
pub heart_value: Option<TextPieceHeart>,
#[prost(message, optional, tag = "7")]
pub pattern_ref_value: Option<TextPiecePatternRef>,
#[prost(message, optional, tag = "8")]
pub image_value: Option<TextPieceImage>,
}
#[derive(Message)]
pub struct TextPieceUser {
#[prost(message, optional, tag = "1")]
pub user: Option<User>,
#[prost(bool, tag = "2")]
pub with_colon: bool,
}
#[derive(Message)]
pub struct TextPieceGift {
#[prost(uint64, tag = "1")]
pub gift_id: u64,
#[prost(message, optional, tag = "2")]
pub name_ref: Option<PatternRef>,
}
#[derive(Message)]
pub struct PatternRef {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub default_pattern: String,
}
#[derive(Message)]
pub struct TextPieceHeart {
#[prost(string, tag = "1")]
pub color: String,
}
#[derive(Message)]
pub struct TextPiecePatternRef {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub default_pattern: String,
}
#[derive(Message)]
pub struct TextPieceImage {
#[prost(message, optional, tag = "1")]
pub image: Option<Image>,
#[prost(float, tag = "2")]
pub scaling_rate: f32,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum CommentTypeTag {
CommentTypeTagUnknown = 0,
CommentTypeTagStar = 1,
}
#[derive(Message)]
pub struct DouyinChatMessage {
#[prost(message, optional, tag = "1")]
pub common: Option<Common>,
#[prost(message, optional, tag = "2")]
pub user: Option<User>,
#[prost(string, tag = "3")]
pub content: String,
#[prost(bool, tag = "4")]
pub visible_to_sender: bool,
#[prost(message, optional, tag = "5")]
pub background_image: Option<Image>,
#[prost(string, tag = "6")]
pub full_screen_text_color: String,
#[prost(message, optional, tag = "7")]
pub background_image_v2: Option<Image>,
#[prost(message, optional, tag = "9")]
pub public_area_common: Option<PublicAreaCommon>,
#[prost(message, optional, tag = "10")]
pub gift_image: Option<Image>,
#[prost(uint64, tag = "11")]
pub agree_msg_id: u64,
#[prost(uint32, tag = "12")]
pub priority_level: u32,
#[prost(message, optional, tag = "13")]
pub landscape_area_common: Option<LandscapeAreaCommon>,
#[prost(uint64, tag = "15")]
pub event_time: u64,
#[prost(bool, tag = "16")]
pub send_review: bool,
#[prost(bool, tag = "17")]
pub from_intercom: bool,
#[prost(bool, tag = "18")]
pub intercom_hide_user_card: bool,
#[prost(string, tag = "20")]
pub chat_by: String,
#[prost(uint32, tag = "21")]
pub individual_chat_priority: u32,
#[prost(message, optional, tag = "22")]
pub rtf_content: Option<Text>,
}
#[derive(Message)]
pub struct GiftMessage {
#[prost(message, optional, tag = "1")]
pub common: Option<Common>,
#[prost(uint64, tag = "2")]
pub gift_id: u64,
#[prost(uint64, tag = "3")]
pub fan_ticket_count: u64,
#[prost(uint64, tag = "4")]
pub group_count: u64,
#[prost(uint64, tag = "5")]
pub repeat_count: u64,
#[prost(uint64, tag = "6")]
pub combo_count: u64,
#[prost(message, optional, tag = "7")]
pub user: Option<User>,
#[prost(message, optional, tag = "8")]
pub to_user: Option<User>,
#[prost(uint32, tag = "9")]
pub repeat_end: u32,
#[prost(message, optional, tag = "10")]
pub text_effect: Option<TextEffect>,
#[prost(uint64, tag = "11")]
pub group_id: u64,
#[prost(uint64, tag = "12")]
pub income_taskgifts: u64,
#[prost(uint64, tag = "13")]
pub room_fan_ticket_count: u64,
#[prost(message, optional, tag = "14")]
pub priority: Option<GiftIMPriority>,
#[prost(message, optional, tag = "15")]
pub gift: Option<GiftStruct>,
#[prost(string, tag = "16")]
pub log_id: String,
#[prost(uint64, tag = "17")]
pub send_type: u64,
#[prost(message, optional, tag = "18")]
pub public_area_common: Option<PublicAreaCommon>,
#[prost(message, optional, tag = "19")]
pub tray_display_text: Option<Text>,
#[prost(uint64, tag = "20")]
pub banned_display_effects: u64,
#[prost(bool, tag = "25")]
pub display_for_self: bool,
#[prost(string, tag = "26")]
pub interact_gift_info: String,
#[prost(string, tag = "27")]
pub diy_item_info: String,
#[prost(uint64, repeated, tag = "28")]
pub min_asset_set_list: Vec<u64>,
#[prost(uint64, tag = "29")]
pub total_count: u64,
#[prost(uint32, tag = "30")]
pub client_gift_source: u32,
#[prost(uint64, repeated, tag = "32")]
pub to_user_ids_list: Vec<u64>,
#[prost(uint64, tag = "33")]
pub send_time: u64,
#[prost(uint64, tag = "34")]
pub force_display_effects: u64,
#[prost(string, tag = "35")]
pub trace_id: String,
#[prost(uint64, tag = "36")]
pub effect_display_ts: u64,
}
#[derive(Message)]
pub struct GiftStruct {
#[prost(message, optional, tag = "1")]
pub image: Option<Image>,
#[prost(string, tag = "2")]
pub describe: String,
#[prost(bool, tag = "3")]
pub notify: bool,
#[prost(uint64, tag = "4")]
pub duration: u64,
#[prost(uint64, tag = "5")]
pub id: u64,
#[prost(bool, tag = "7")]
pub for_linkmic: bool,
#[prost(bool, tag = "8")]
pub doodle: bool,
#[prost(bool, tag = "9")]
pub for_fansclub: bool,
#[prost(bool, tag = "10")]
pub combo: bool,
#[prost(uint32, tag = "11")]
pub r#type: u32,
#[prost(uint32, tag = "12")]
pub diamond_count: u32,
#[prost(bool, tag = "13")]
pub is_displayed_on_panel: bool,
#[prost(uint64, tag = "14")]
pub primary_effect_id: u64,
#[prost(message, optional, tag = "15")]
pub gift_label_icon: Option<Image>,
#[prost(string, tag = "16")]
pub name: String,
#[prost(string, tag = "17")]
pub region: String,
#[prost(string, tag = "18")]
pub manual: String,
#[prost(bool, tag = "19")]
pub for_custom: bool,
#[prost(message, optional, tag = "21")]
pub icon: Option<Image>,
#[prost(uint32, tag = "22")]
pub action_type: u32,
}
#[derive(Message)]
pub struct GiftIMPriority {
#[prost(uint64, repeated, tag = "1")]
pub queue_sizes_list: Vec<u64>,
#[prost(uint64, tag = "2")]
pub self_queue_priority: u64,
#[prost(uint64, tag = "3")]
pub priority: u64,
}
#[derive(Message)]
pub struct TextEffect {
#[prost(message, optional, tag = "1")]
pub portrait: Option<TextEffectDetail>,
#[prost(message, optional, tag = "2")]
pub landscape: Option<TextEffectDetail>,
}
#[derive(Message)]
pub struct TextEffectDetail {
#[prost(message, optional, tag = "1")]
pub text: Option<Text>,
#[prost(uint32, tag = "2")]
pub text_font_size: u32,
#[prost(message, optional, tag = "3")]
pub background: Option<Image>,
#[prost(uint32, tag = "4")]
pub start: u32,
#[prost(uint32, tag = "5")]
pub duration: u32,
#[prost(uint32, tag = "6")]
pub x: u32,
#[prost(uint32, tag = "7")]
pub y: u32,
#[prost(uint32, tag = "8")]
pub width: u32,
#[prost(uint32, tag = "9")]
pub height: u32,
#[prost(uint32, tag = "10")]
pub shadow_dx: u32,
#[prost(uint32, tag = "11")]
pub shadow_dy: u32,
#[prost(uint32, tag = "12")]
pub shadow_radius: u32,
#[prost(string, tag = "13")]
pub shadow_color: String,
#[prost(string, tag = "14")]
pub stroke_color: String,
#[prost(uint32, tag = "15")]
pub stroke_width: u32,
}
#[derive(Message)]
pub struct LikeMessage {
#[prost(message, optional, tag = "1")]
pub common: Option<Common>,
#[prost(uint64, tag = "2")]
pub count: u64,
#[prost(uint64, tag = "3")]
pub total: u64,
#[prost(uint64, tag = "4")]
pub color: u64,
#[prost(message, optional, tag = "5")]
pub user: Option<User>,
#[prost(string, tag = "6")]
pub icon: String,
#[prost(message, optional, tag = "7")]
pub double_like_detail: Option<DoubleLikeDetail>,
#[prost(message, optional, tag = "8")]
pub display_control_info: Option<DisplayControlInfo>,
#[prost(uint64, tag = "9")]
pub linkmic_guest_uid: u64,
#[prost(string, tag = "10")]
pub scene: String,
#[prost(message, optional, tag = "11")]
pub pico_display_info: Option<PicoDisplayInfo>,
}
#[derive(Message)]
pub struct DoubleLikeDetail {
#[prost(bool, tag = "1")]
pub double_flag: bool,
#[prost(uint32, tag = "2")]
pub seq_id: u32,
#[prost(uint32, tag = "3")]
pub renewals_num: u32,
#[prost(uint32, tag = "4")]
pub triggers_num: u32,
}
#[derive(Message)]
pub struct DisplayControlInfo {
#[prost(bool, tag = "1")]
pub show_text: bool,
#[prost(bool, tag = "2")]
pub show_icons: bool,
}
#[derive(Message)]
pub struct PicoDisplayInfo {
#[prost(uint64, tag = "1")]
pub combo_sum_count: u64,
#[prost(string, tag = "2")]
pub emoji: String,
#[prost(message, optional, tag = "3")]
pub emoji_icon: Option<Image>,
#[prost(string, tag = "4")]
pub emoji_text: String,
}
#[derive(Message)]
pub struct MemberMessage {
#[prost(message, optional, tag = "1")]
pub common: Option<Common>,
#[prost(message, optional, tag = "2")]
pub user: Option<User>,
#[prost(uint64, tag = "3")]
pub member_count: u64,
#[prost(message, optional, tag = "4")]
pub operator: Option<User>,
#[prost(bool, tag = "5")]
pub is_set_to_admin: bool,
#[prost(bool, tag = "6")]
pub is_top_user: bool,
#[prost(uint64, tag = "7")]
pub rank_score: u64,
#[prost(uint64, tag = "8")]
pub top_user_no: u64,
#[prost(uint64, tag = "9")]
pub enter_type: u64,
#[prost(uint64, tag = "10")]
pub action: u64,
#[prost(string, tag = "11")]
pub action_description: String,
#[prost(uint64, tag = "12")]
pub user_id: u64,
#[prost(message, optional, tag = "13")]
pub effect_config: Option<EffectConfig>,
#[prost(string, tag = "14")]
pub pop_str: String,
#[prost(message, optional, tag = "15")]
pub enter_effect_config: Option<EffectConfig>,
#[prost(message, optional, tag = "16")]
pub background_image: Option<Image>,
#[prost(message, optional, tag = "17")]
pub background_image_v2: Option<Image>,
#[prost(message, optional, tag = "18")]
pub anchor_display_text: Option<Text>,
#[prost(message, optional, tag = "19")]
pub public_area_common: Option<PublicAreaCommon>,
#[prost(uint64, tag = "20")]
pub user_enter_tip_type: u64,
#[prost(uint64, tag = "21")]
pub anchor_enter_tip_type: u64,
}
#[derive(Message)]
pub struct EffectConfig {
#[prost(uint64, tag = "1")]
pub r#type: u64,
#[prost(message, optional, tag = "2")]
pub icon: Option<Image>,
#[prost(uint64, tag = "3")]
pub avatar_pos: u64,
#[prost(message, optional, tag = "4")]
pub text: Option<Text>,
#[prost(message, optional, tag = "5")]
pub text_icon: Option<Image>,
#[prost(uint32, tag = "6")]
pub stay_time: u32,
#[prost(uint64, tag = "7")]
pub anim_asset_id: u64,
#[prost(message, optional, tag = "8")]
pub badge: Option<Image>,
#[prost(uint64, repeated, tag = "9")]
pub flex_setting_array_list: Vec<u64>,
#[prost(message, optional, tag = "10")]
pub text_icon_overlay: Option<Image>,
#[prost(message, optional, tag = "11")]
pub animated_badge: Option<Image>,
#[prost(bool, tag = "12")]
pub has_sweep_light: bool,
#[prost(uint64, repeated, tag = "13")]
pub text_flex_setting_array_list: Vec<u64>,
#[prost(uint64, tag = "14")]
pub center_anim_asset_id: u64,
#[prost(message, optional, tag = "15")]
pub dynamic_image: Option<Image>,
#[prost(map = "string, string", tag = "16")]
pub extra_map: HashMap<String, String>,
#[prost(uint64, tag = "17")]
pub mp4_anim_asset_id: u64,
#[prost(uint64, tag = "18")]
pub priority: u64,
#[prost(uint64, tag = "19")]
pub max_wait_time: u64,
#[prost(string, tag = "20")]
pub dress_id: String,
#[prost(uint64, tag = "21")]
pub alignment: u64,
#[prost(uint64, tag = "22")]
pub alignment_offset: u64,
}
// message PushFrame {
// uint64 seqId = 1;
// uint64 logId = 2;
// uint64 service = 3;
// uint64 method = 4;
// repeated HeadersList headersList = 5;
// string payloadEncoding = 6;
// string payloadType = 7;
// bytes payload = 8;
// }
#[derive(Message)]
pub struct PushFrame {
#[prost(uint64, tag = "1")]
pub seq_id: u64,
#[prost(uint64, tag = "2")]
pub log_id: u64,
#[prost(uint64, tag = "3")]
pub service: u64,
#[prost(uint64, tag = "4")]
pub method: u64,
#[prost(message, repeated, tag = "5")]
pub headers_list: Vec<HeadersList>,
#[prost(string, tag = "6")]
pub payload_encoding: String,
#[prost(string, tag = "7")]
pub payload_type: String,
#[prost(bytes, tag = "8")]
pub payload: Vec<u8>,
}
// message HeadersList {
// string key = 1;
// string value = 2;
// }
#[derive(Message)]
pub struct HeadersList {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub value: String,
}

File diff suppressed because one or more lines are too long

View File

@@ -100,6 +100,7 @@ async fn setup_logging(log_dir: &Path) -> Result<(), Box<dyn std::error::Error>>
.add_filter_ignore_str("sqlx") .add_filter_ignore_str("sqlx")
.add_filter_ignore_str("reqwest") .add_filter_ignore_str("reqwest")
.add_filter_ignore_str("h2") .add_filter_ignore_str("h2")
.add_filter_ignore_str("danmu_stream")
.build(); .build();
simplelog::CombinedLogger::init(vec![ simplelog::CombinedLogger::init(vec![

View File

@@ -14,16 +14,15 @@ use super::danmu::{DanmuEntry, DanmuStorage};
use super::entry::TsEntry; use super::entry::TsEntry;
use chrono::Utc; use chrono::Utc;
use client::{BiliClient, BiliStream, RoomInfo, StreamType, UserInfo}; use client::{BiliClient, BiliStream, RoomInfo, StreamType, UserInfo};
use danmu_stream::danmu_stream::DanmuStream;
use danmu_stream::provider::ProviderType;
use danmu_stream::DanmuMessageType;
use errors::BiliClientError; use errors::BiliClientError;
use felgens::{ws_socket_object, FelgensError, WsStreamMessageType};
use m3u8_rs::{Playlist, QuotedOrUnquoted, VariantStream}; use m3u8_rs::{Playlist, QuotedOrUnquoted, VariantStream};
use rand::Rng;
use regex::Regex; use regex::Regex;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::{broadcast, Mutex, RwLock}; use tokio::sync::{broadcast, Mutex, RwLock};
use url::Url; use url::Url;
@@ -32,7 +31,7 @@ use crate::database::{Database, DatabaseError};
use async_trait::async_trait; use async_trait::async_trait;
#[cfg(not(feature = "headless"))] #[cfg(feature = "gui")]
use {tauri::AppHandle, tauri_plugin_notification::NotificationExt}; use {tauri::AppHandle, tauri_plugin_notification::NotificationExt};
/// A recorder for BiliBili live streams /// A recorder for BiliBili live streams
@@ -42,7 +41,7 @@ use {tauri::AppHandle, tauri_plugin_notification::NotificationExt};
// TODO implement StreamType::TS // TODO implement StreamType::TS
#[derive(Clone)] #[derive(Clone)]
pub struct BiliRecorder { pub struct BiliRecorder {
#[cfg(not(feature = "headless"))] #[cfg(feature = "gui")]
app_handle: AppHandle, app_handle: AppHandle,
emitter: EventEmitter, emitter: EventEmitter,
client: Arc<RwLock<BiliClient>>, client: Arc<RwLock<BiliClient>>,
@@ -64,6 +63,8 @@ pub struct BiliRecorder {
danmu_storage: Arc<RwLock<Option<DanmuStorage>>>, danmu_storage: Arc<RwLock<Option<DanmuStorage>>>,
live_end_channel: broadcast::Sender<RecorderEvent>, live_end_channel: broadcast::Sender<RecorderEvent>,
enabled: Arc<RwLock<bool>>, enabled: Arc<RwLock<bool>>,
danmu_stream: Arc<RwLock<Option<DanmuStream>>>,
} }
impl From<DatabaseError> for super::errors::RecorderError { impl From<DatabaseError> for super::errors::RecorderError {
@@ -79,7 +80,7 @@ impl From<BiliClientError> for super::errors::RecorderError {
} }
pub struct BiliRecorderOptions { pub struct BiliRecorderOptions {
#[cfg(not(feature = "headless"))] #[cfg(feature = "gui")]
pub app_handle: AppHandle, pub app_handle: AppHandle,
pub emitter: EventEmitter, pub emitter: EventEmitter,
pub db: Arc<Database>, pub db: Arc<Database>,
@@ -111,7 +112,7 @@ impl BiliRecorder {
} }
let recorder = Self { let recorder = Self {
#[cfg(not(feature = "headless"))] #[cfg(feature = "gui")]
app_handle: options.app_handle, app_handle: options.app_handle,
emitter: options.emitter, emitter: options.emitter,
client: Arc::new(RwLock::new(client)), client: Arc::new(RwLock::new(client)),
@@ -133,6 +134,7 @@ impl BiliRecorder {
danmu_storage: Arc::new(RwLock::new(None)), danmu_storage: Arc::new(RwLock::new(None)),
live_end_channel: options.channel, live_end_channel: options.channel,
enabled: Arc::new(RwLock::new(options.auto_start)), enabled: Arc::new(RwLock::new(options.auto_start)),
danmu_stream: Arc::new(RwLock::new(None)),
}; };
log::info!("Recorder for room {} created.", options.room_id); log::info!("Recorder for room {} created.", options.room_id);
Ok(recorder) Ok(recorder)
@@ -177,7 +179,7 @@ impl BiliRecorder {
if live_status { if live_status {
if self.config.read().await.live_start_notify { if self.config.read().await.live_start_notify {
#[cfg(not(feature = "headless"))] #[cfg(feature = "gui")]
self.app_handle self.app_handle
.notification() .notification()
.builder() .builder()
@@ -202,7 +204,7 @@ impl BiliRecorder {
*self.cover.write().await = Some(cover_base64); *self.cover.write().await = Some(cover_base64);
} }
} else if self.config.read().await.live_end_notify { } else if self.config.read().await.live_end_notify {
#[cfg(not(feature = "headless"))] #[cfg(feature = "gui")]
self.app_handle self.app_handle
.notification() .notification()
.builder() .builder()
@@ -372,47 +374,62 @@ impl BiliRecorder {
Ok(stream) Ok(stream)
} }
async fn danmu(&self) { async fn danmu(&self) -> Result<(), super::errors::RecorderError> {
let cookies = self.account.cookies.clone(); let cookies = self.account.cookies.clone();
let uid: u64 = self.account.uid; let room_id = self.room_id;
while !*self.quit.lock().await { let danmu_stream = DanmuStream::new(ProviderType::BiliBili, &cookies, room_id).await;
let (tx, rx) = mpsc::unbounded_channel(); if danmu_stream.is_err() {
let ws = ws_socket_object(tx, uid, self.room_id, cookies.as_str()); let err = danmu_stream.err().unwrap();
if let Err(e) = tokio::select! {v = ws => v, v = self.recv(self.room_id,rx) => v} { log::error!("Failed to create danmu stream: {}", err);
log::error!("danmu error: {}", e); return Err(super::errors::RecorderError::DanmuStreamError { err });
}
// reconnect after 3s
log::warn!("danmu will reconnect after 3s");
tokio::time::sleep(Duration::from_secs(3)).await;
} }
let danmu_stream = danmu_stream.unwrap();
*self.danmu_stream.write().await = Some(danmu_stream);
log::info!("danmu thread {} quit.", self.room_id); // create a task to receive danmu message
} let self_clone = self.clone();
tokio::spawn(async move {
async fn recv( let _ = self_clone
&self, .danmu_stream
room: u64, .read()
mut rx: UnboundedReceiver<WsStreamMessageType>, .await
) -> Result<(), FelgensError> { .as_ref()
while let Some(msg) = rx.recv().await { .unwrap()
if *self.quit.lock().await { .start()
break; .await;
} });
if let WsStreamMessageType::DanmuMsg(msg) = msg {
self.emitter.emit(&Event::DanmuReceived { loop {
room, if let Ok(Some(msg)) = self
ts: msg.timestamp as i64, .danmu_stream
content: msg.msg.clone(), .read()
.await
.as_ref()
.unwrap()
.recv()
.await
{
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
self.emitter.emit(&Event::DanmuReceived {
room: self.room_id,
ts: danmu.timestamp,
content: danmu.message.clone(),
}); });
if *self.live_status.read().await {
// save danmu
if let Some(storage) = self.danmu_storage.write().await.as_ref() { if let Some(storage) = self.danmu_storage.write().await.as_ref() {
storage.add_line(msg.timestamp as i64, &msg.msg).await; storage.add_line(danmu.timestamp, &danmu.message).await;
} }
} }
} }
} else {
log::error!("Failed to receive danmu message");
return Err(super::errors::RecorderError::DanmuStreamError {
err: danmu_stream::DanmuStreamError::WebsocketError {
err: "Failed to receive danmu message".to_string(),
},
});
}
} }
Ok(())
} }
async fn get_playlist(&self) -> Result<Playlist, super::errors::RecorderError> { async fn get_playlist(&self) -> Result<Playlist, super::errors::RecorderError> {
@@ -768,13 +785,12 @@ impl BiliRecorder {
// check stream is nearly expired // check stream is nearly expired
// WHY: when program started, all stream is fetched nearly at the same time, so they will expire toggether, // WHY: when program started, all stream is fetched nearly at the same time, so they will expire toggether,
// this might meet server rate limit. So we add a random offset to make request spread over time. // this might meet server rate limit. So we add a random offset to make request spread over time.
let mut rng = rand::thread_rng(); let pre_offset = rand::random::<u64>() % 181 + 120; // Random number between 120 and 300
let pre_offset = rng.gen_range(120..=300);
// no need to update stream as it's not expired yet // no need to update stream as it's not expired yet
let current_stream = self.live_stream.read().await.clone(); let current_stream = self.live_stream.read().await.clone();
if current_stream if current_stream
.as_ref() .as_ref()
.is_some_and(|s| s.expire - Utc::now().timestamp() < pre_offset) .is_some_and(|s| s.expire - Utc::now().timestamp() < pre_offset as i64)
{ {
log::info!("Stream is nearly expired, force update"); log::info!("Stream is nearly expired, force update");
self.force_update.store(true, Ordering::Relaxed); self.force_update.store(true, Ordering::Relaxed);
@@ -824,21 +840,24 @@ impl BiliRecorder {
impl super::Recorder for BiliRecorder { impl super::Recorder for BiliRecorder {
async fn run(&self) { async fn run(&self) {
let self_clone = self.clone(); let self_clone = self.clone();
thread::spawn(move || { tokio::spawn(async move {
let runtime = tokio::runtime::Runtime::new().unwrap(); log::info!("Start fetching danmu for room {}", self_clone.room_id);
runtime.block_on(async move { let _ = self_clone.danmu().await;
});
let self_clone = self.clone();
tokio::spawn(async move {
log::info!("Start running recorder for room {}", self_clone.room_id);
while !*self_clone.quit.lock().await { while !*self_clone.quit.lock().await {
let mut connection_fail_count = 0; let mut connection_fail_count = 0;
let mut rng = rand::thread_rng();
if self_clone.check_status().await { if self_clone.check_status().await {
// Live status is ok, start recording. // Live status is ok, start recording.
while self_clone.should_record().await { while self_clone.should_record().await {
match self_clone.update_entries().await { match self_clone.update_entries().await {
Ok(ms) => { Ok(ms) => {
if ms < 1000 { if ms < 1000 {
thread::sleep(std::time::Duration::from_millis( tokio::time::sleep(Duration::from_millis((1000 - ms) as u64))
(1000 - ms) as u64, .await;
));
} }
if ms >= 3000 { if ms >= 3000 {
log::warn!( log::warn!(
@@ -851,11 +870,7 @@ impl super::Recorder for BiliRecorder {
connection_fail_count = 0; connection_fail_count = 0;
} }
Err(e) => { Err(e) => {
log::error!( log::error!("[{}]Update entries error: {}", self_clone.room_id, e);
"[{}]Update entries error: {}",
self_clone.room_id,
e
);
if let RecorderError::BiliClientError { err: _ } = e { if let RecorderError::BiliClientError { err: _ } = e {
connection_fail_count = connection_fail_count =
std::cmp::min(5, connection_fail_count + 1); std::cmp::min(5, connection_fail_count + 1);
@@ -866,32 +881,23 @@ impl super::Recorder for BiliRecorder {
} }
*self_clone.is_recording.write().await = false; *self_clone.is_recording.write().await = false;
// go check status again after random 2-5 secs // go check status again after random 2-5 secs
let secs = rng.gen_range(2..=5); let secs = rand::random::<u64>() % 4 + 2;
tokio::time::sleep(Duration::from_secs( tokio::time::sleep(Duration::from_secs(
secs + 2_u64.pow(connection_fail_count), secs + 2_u64.pow(connection_fail_count),
)) ))
.await; .await;
continue; continue;
} }
thread::sleep(std::time::Duration::from_secs(
self_clone.config.read().await.status_check_interval,
));
} }
log::info!("recording thread {} quit.", self_clone.room_id);
});
});
// Thread for danmaku
let self_clone = self.clone();
thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async move {
self_clone.danmu().await;
});
}); });
} }
async fn stop(&self) { async fn stop(&self) {
*self.quit.lock().await = true; *self.quit.lock().await = true;
if let Some(danmu_stream) = self.danmu_stream.write().await.take() {
let _ = danmu_stream.stop().await;
}
log::info!("Recorder for room {} quit.", self.room_id);
} }
/// timestamp is the id of live stream /// timestamp is the id of live stream

View File

@@ -12,6 +12,7 @@ pub struct GeneralResponse {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)] #[serde(untagged)]
#[allow(clippy::large_enum_variant)]
pub enum Data { pub enum Data {
VideoSubmit(VideoSubmitData), VideoSubmit(VideoSubmitData),
Cover(CoverData), Cover(CoverData),

View File

@@ -7,16 +7,23 @@ use super::{
UserInfo, UserInfo,
}; };
use crate::database::Database; use crate::database::Database;
use crate::progress_manager::Event;
use crate::progress_reporter::EventEmitter;
use crate::recorder_manager::RecorderEvent; use crate::recorder_manager::RecorderEvent;
use crate::{config::Config, database::account::AccountRow}; use crate::{config::Config, database::account::AccountRow};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc; use chrono::Utc;
use client::DouyinClientError; use client::DouyinClientError;
use danmu_stream::danmu_stream::DanmuStream;
use danmu_stream::provider::ProviderType;
use danmu_stream::DanmuMessageType;
use rand::random; use rand::random;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
use super::danmu::DanmuStorage;
#[cfg(not(feature = "headless"))] #[cfg(not(feature = "headless"))]
use {tauri::AppHandle, tauri_plugin_notification::NotificationExt}; use {tauri::AppHandle, tauri_plugin_notification::NotificationExt};
@@ -42,33 +49,40 @@ impl From<DouyinClientError> for RecorderError {
pub struct DouyinRecorder { pub struct DouyinRecorder {
#[cfg(not(feature = "headless"))] #[cfg(not(feature = "headless"))]
app_handle: AppHandle, app_handle: AppHandle,
emitter: EventEmitter,
client: client::DouyinClient, client: client::DouyinClient,
db: Arc<Database>, db: Arc<Database>,
pub room_id: u64, account: AccountRow,
pub room_info: Arc<RwLock<Option<response::DouyinRoomInfoResponse>>>, room_id: u64,
pub stream_url: Arc<RwLock<Option<String>>>, room_info: Arc<RwLock<Option<response::DouyinRoomInfoResponse>>>,
pub entry_store: Arc<RwLock<Option<EntryStore>>>, stream_url: Arc<RwLock<Option<String>>>,
pub live_id: Arc<RwLock<String>>, entry_store: Arc<RwLock<Option<EntryStore>>>,
pub live_status: Arc<RwLock<LiveStatus>>, danmu_store: Arc<RwLock<Option<DanmuStorage>>>,
live_id: Arc<RwLock<String>>,
live_status: Arc<RwLock<LiveStatus>>,
is_recording: Arc<RwLock<bool>>, is_recording: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>, running: Arc<RwLock<bool>>,
last_update: Arc<RwLock<i64>>, last_update: Arc<RwLock<i64>>,
config: Arc<RwLock<Config>>, config: Arc<RwLock<Config>>,
live_end_channel: broadcast::Sender<RecorderEvent>, live_end_channel: broadcast::Sender<RecorderEvent>,
enabled: Arc<RwLock<bool>>, enabled: Arc<RwLock<bool>>,
danmu_stream: Arc<RwLock<Option<DanmuStream>>>,
} }
impl DouyinRecorder { impl DouyinRecorder {
#[allow(clippy::too_many_arguments)]
pub async fn new( pub async fn new(
#[cfg(not(feature = "headless"))] app_handle: AppHandle, #[cfg(not(feature = "headless"))] app_handle: AppHandle,
emitter: EventEmitter,
room_id: u64, room_id: u64,
config: Arc<RwLock<Config>>, config: Arc<RwLock<Config>>,
douyin_account: &AccountRow, account: &AccountRow,
db: &Arc<Database>, db: &Arc<Database>,
enabled: bool, enabled: bool,
channel: broadcast::Sender<RecorderEvent>, channel: broadcast::Sender<RecorderEvent>,
) -> Result<Self, super::errors::RecorderError> { ) -> Result<Self, super::errors::RecorderError> {
let client = client::DouyinClient::new(douyin_account); let client = client::DouyinClient::new(account);
let room_info = client.get_room_info(room_id).await?; let room_info = client.get_room_info(room_id).await?;
let mut live_status = LiveStatus::Offline; let mut live_status = LiveStatus::Offline;
if room_info.data.room_status == 0 { if room_info.data.room_status == 0 {
@@ -78,10 +92,13 @@ impl DouyinRecorder {
Ok(Self { Ok(Self {
#[cfg(not(feature = "headless"))] #[cfg(not(feature = "headless"))]
app_handle, app_handle,
emitter,
db: db.clone(), db: db.clone(),
account: account.clone(),
room_id, room_id,
live_id: Arc::new(RwLock::new(String::new())), live_id: Arc::new(RwLock::new(String::new())),
entry_store: Arc::new(RwLock::new(None)), entry_store: Arc::new(RwLock::new(None)),
danmu_store: Arc::new(RwLock::new(None)),
client, client,
room_info: Arc::new(RwLock::new(Some(room_info))), room_info: Arc::new(RwLock::new(Some(room_info))),
stream_url: Arc::new(RwLock::new(None)), stream_url: Arc::new(RwLock::new(None)),
@@ -92,6 +109,7 @@ impl DouyinRecorder {
last_update: Arc::new(RwLock::new(Utc::now().timestamp())), last_update: Arc::new(RwLock::new(Utc::now().timestamp())),
config, config,
live_end_channel: channel, live_end_channel: channel,
danmu_stream: Arc::new(RwLock::new(None)),
}) })
} }
@@ -210,6 +228,22 @@ impl DouyinRecorder {
let work_dir = self.get_work_dir(self.live_id.read().await.as_str()).await; let work_dir = self.get_work_dir(self.live_id.read().await.as_str()).await;
let entry_store = EntryStore::new(&work_dir).await; let entry_store = EntryStore::new(&work_dir).await;
*self.entry_store.write().await = Some(entry_store); *self.entry_store.write().await = Some(entry_store);
// setup danmu store
let danmu_file_path = format!("{}{}", work_dir, "danmu.txt");
let danmu_store = DanmuStorage::new(&danmu_file_path).await;
*self.danmu_store.write().await = danmu_store;
// start danmu task
if let Some(danmu_stream) = self.danmu_stream.write().await.take() {
let _ = danmu_stream.stop().await;
}
let live_id = self.live_id.read().await.clone();
let self_clone = self.clone();
tokio::spawn(async move {
log::info!("Start fetching danmu for live {}", live_id);
let _ = self_clone.danmu().await;
});
} }
true true
@@ -221,6 +255,69 @@ impl DouyinRecorder {
} }
} }
async fn danmu(&self) -> Result<(), super::errors::RecorderError> {
let cookies = self.account.cookies.clone();
let live_id = self
.live_id
.read()
.await
.clone()
.parse::<u64>()
.unwrap_or(0);
let danmu_stream = DanmuStream::new(ProviderType::Douyin, &cookies, live_id).await;
if danmu_stream.is_err() {
let err = danmu_stream.err().unwrap();
log::error!("Failed to create danmu stream: {}", err);
return Err(super::errors::RecorderError::DanmuStreamError { err });
}
let danmu_stream = danmu_stream.unwrap();
*self.danmu_stream.write().await = Some(danmu_stream);
let self_clone = self.clone();
tokio::spawn(async move {
let _ = self_clone
.danmu_stream
.read()
.await
.as_ref()
.unwrap()
.start()
.await;
});
loop {
if let Ok(Some(msg)) = self
.danmu_stream
.read()
.await
.as_ref()
.unwrap()
.recv()
.await
{
match msg {
DanmuMessageType::DanmuMessage(danmu) => {
self.emitter.emit(&Event::DanmuReceived {
room: self.room_id,
ts: danmu.timestamp,
content: danmu.message.clone(),
});
if let Some(storage) = self.danmu_store.read().await.as_ref() {
storage.add_line(danmu.timestamp, &danmu.message).await;
}
}
}
} else {
log::error!("Failed to receive danmu message");
return Err(super::errors::RecorderError::DanmuStreamError {
err: danmu_stream::DanmuStreamError::WebsocketError {
err: "Failed to receive danmu message".to_string(),
},
});
}
}
}
async fn reset(&self) { async fn reset(&self) {
*self.entry_store.write().await = None; *self.entry_store.write().await = None;
*self.live_id.write().await = String::new(); *self.live_id.write().await = String::new();

View File

@@ -19,4 +19,5 @@ custom_error! {pub RecorderError
BiliClientError {err: super::bilibili::errors::BiliClientError} = "BiliClient error: {err}", BiliClientError {err: super::bilibili::errors::BiliClientError} = "BiliClient error: {err}",
DouyinClientError {err: DouyinClientError} = "DouyinClient error: {err}", DouyinClientError {err: DouyinClientError} = "DouyinClient error: {err}",
IoError {err: std::io::Error} = "IO error: {err}", IoError {err: std::io::Error} = "IO error: {err}",
DanmuStreamError {err: danmu_stream::DanmuStreamError} = "Danmu stream error: {err}",
} }

View File

@@ -358,6 +358,7 @@ impl RecorderManager {
DouyinRecorder::new( DouyinRecorder::new(
#[cfg(not(feature = "headless"))] #[cfg(not(feature = "headless"))]
self.app_handle.clone(), self.app_handle.clone(),
self.emitter.clone(),
room_id, room_id,
self.config.clone(), self.config.clone(),
account, account,

View File

@@ -132,8 +132,8 @@
shaka.util.Error.Severity.CRITICAL, shaka.util.Error.Severity.CRITICAL,
shaka.util.Error.Category.NETWORK, shaka.util.Error.Category.NETWORK,
shaka.util.Error.Code.OPERATION_ABORTED, shaka.util.Error.Code.OPERATION_ABORTED,
error.message || "Network request failed", error.message || "Network request failed"
), )
); );
}); });
}); });
@@ -236,7 +236,7 @@
error.code + error.code +
"\n" + "\n" +
"Error message: " + "Error message: " +
error.message, error.message
); );
} }
} }
@@ -256,7 +256,7 @@
document.getElementsByClassName("shaka-fullscreen-button")[0].remove(); document.getElementsByClassName("shaka-fullscreen-button")[0].remove();
// add self-defined element in shaka-bottom-controls.shaka-no-propagation (second seekbar) // add self-defined element in shaka-bottom-controls.shaka-no-propagation (second seekbar)
const shakaBottomControls = document.querySelector( const shakaBottomControls = document.querySelector(
".shaka-bottom-controls.shaka-no-propagation", ".shaka-bottom-controls.shaka-no-propagation"
); );
const selfSeekbar = document.createElement("div"); const selfSeekbar = document.createElement("div");
selfSeekbar.className = "shaka-seek-bar shaka-no-propagation"; selfSeekbar.className = "shaka-seek-bar shaka-no-propagation";
@@ -287,7 +287,6 @@
let ts = parseInt(live_id); let ts = parseInt(live_id);
if (platform == "bilibili") {
let danmu_displayed = {}; let danmu_displayed = {};
// history danmaku sender // history danmaku sender
setInterval(() => { setInterval(() => {
@@ -301,7 +300,7 @@
} }
const cur = Math.floor( const cur = Math.floor(
(video.currentTime + global_offset + focus_start) * 1000, (video.currentTime + global_offset + focus_start) * 1000
); );
let danmus = danmu_records.filter((v) => { let danmus = danmu_records.filter((v) => {
@@ -317,6 +316,7 @@
}, 1000); }, 1000);
if (isLive()) { if (isLive()) {
if (platform == "bilibili") {
// add a account select // add a account select
const accountSelect = document.createElement("select"); const accountSelect = document.createElement("select");
accountSelect.style.height = "30px"; accountSelect.style.height = "30px";
@@ -327,7 +327,6 @@
accountSelect.style.padding = "0 10px"; accountSelect.style.padding = "0 10px";
accountSelect.style.boxSizing = "border-box"; accountSelect.style.boxSizing = "border-box";
accountSelect.style.fontSize = "1em"; accountSelect.style.fontSize = "1em";
// get accounts from tauri // get accounts from tauri
const account_info = (await invoke("get_accounts")) as AccountInfo; const account_info = (await invoke("get_accounts")) as AccountInfo;
account_info.accounts.forEach((account) => { account_info.accounts.forEach((account) => {
@@ -370,6 +369,7 @@
shakaSpacer.appendChild(accountSelect); shakaSpacer.appendChild(accountSelect);
shakaSpacer.appendChild(danmakuInput); shakaSpacer.appendChild(danmakuInput);
}
// listen to danmaku event // listen to danmaku event
await listen("danmu:" + room_id, (event: { payload: DanmuEntry }) => { await listen("danmu:" + room_id, (event: { payload: DanmuEntry }) => {
@@ -469,7 +469,6 @@
} }
shakaSpacer.appendChild(danmakuToggle); shakaSpacer.appendChild(danmakuToggle);
}
// create a playback rate select to of shaka-spacer // create a playback rate select to of shaka-spacer
const playbackRateSelect = document.createElement("select"); const playbackRateSelect = document.createElement("select");
@@ -501,7 +500,6 @@
let danmu_statistics: { ts: number; count: number }[] = []; let danmu_statistics: { ts: number; count: number }[] = [];
if (platform == "bilibili") {
// create a danmu statistics select into shaka-spacer // create a danmu statistics select into shaka-spacer
let statisticKey = ""; let statisticKey = "";
const statisticKeyInput = document.createElement("input"); const statisticKeyInput = document.createElement("input");
@@ -546,7 +544,6 @@
}); });
shakaSpacer.appendChild(statisticKeyInput); shakaSpacer.appendChild(statisticKeyInput);
}
// shaka-spacer should be flex-direction: column // shaka-spacer should be flex-direction: column
shakaSpacer.style.flexDirection = "column"; shakaSpacer.style.flexDirection = "column";
@@ -671,11 +668,11 @@
}); });
const seekbarContainer = selfSeekbar.querySelector( const seekbarContainer = selfSeekbar.querySelector(
".shaka-seek-bar-container.self-defined", ".shaka-seek-bar-container.self-defined"
) as HTMLElement; ) as HTMLElement;
const statisticGraph = document.createElement( const statisticGraph = document.createElement(
"canvas", "canvas"
) as HTMLCanvasElement; ) as HTMLCanvasElement;
statisticGraph.style.pointerEvents = "none"; statisticGraph.style.pointerEvents = "none";
statisticGraph.style.position = "absolute"; statisticGraph.style.position = "absolute";
@@ -768,7 +765,7 @@
}%, rgba(255, 255, 255, 0.2) ${first_point * 100}%)`; }%, rgba(255, 255, 255, 0.2) ${first_point * 100}%)`;
// render markers in shaka-ad-markers // render markers in shaka-ad-markers
const adMarkers = document.querySelector( const adMarkers = document.querySelector(
".shaka-ad-markers", ".shaka-ad-markers"
) as HTMLElement; ) as HTMLElement;
if (adMarkers) { if (adMarkers) {
// clean previous markers // clean previous markers