feat: 当新板子上线或音量变化时,推送当前音量给板子。

This commit is contained in:
2023-05-10 21:50:51 +08:00
parent 878180ed5b
commit 8b124f8182
3 changed files with 169 additions and 20 deletions

View File

@@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration};
use paris::{error, info, warn};
use tokio::{io, net::UdpSocket, sync::RwLock, task::yield_now, time::timeout};
use crate::rpc::DisplaySettingRequest;
use crate::{rpc::DisplaySettingRequest, volume::VolumeManager};
use super::{BoardConnectStatus, BoardInfo};
use super::{BoardConnectStatus, BoardInfo, BoardMessageChannels};
#[derive(Debug)]
pub struct Board {
pub info: Arc<RwLock<BoardInfo>>,
socket: Option<Arc<UdpSocket>>,
listen_handler: Option<tokio::task::JoinHandle<()>>,
volume_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
}
impl Board {
@@ -20,25 +21,21 @@ impl Board {
info: Arc::new(RwLock::new(info)),
socket: None,
listen_handler: None,
volume_changed_subscriber_handler: None,
}
}
pub async fn init_socket(&mut self) -> anyhow::Result<()> {
let info = self.info.read().await;
let info = self.info.clone();
let info = info.read().await;
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.connect((info.address, info.port)).await?;
let socket = Arc::new(socket);
self.socket = Some(socket.clone());
let info = self.info.clone();
let handler = tokio::spawn(async move {
let mut buf = [0u8; 128];
if let Err(err) = socket.readable().await {
error!("socket read error: {:?}", err);
return;
}
let board_message_channels = crate::rpc::channels::BoardMessageChannels::global().await;
@@ -82,9 +79,52 @@ impl Board {
});
self.listen_handler = Some(handler);
self.subscribe_volume_changed().await;
Ok(())
}
async fn subscribe_volume_changed(&mut self) {
let channel = BoardMessageChannels::global().await;
let mut volume_changed_rx = channel.volume_changed_sender.subscribe();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
while let Ok(volume) = volume_changed_rx.recv().await {
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send volume changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 2];
buf[0] = 4;
buf[1] = (volume * 100.0) as u8;
if let Err(err) = socket.send(&buf).await {
log::warn!("send volume changed failed: {:?}", err);
}
}
});
let volume_manager = VolumeManager::global().await;
let volume = volume_manager.get_volume().await;
if let Some(socket) = self.socket.as_ref() {
let buf = [4, (volume * 100.0) as u8];
if let Err(err) = socket.send(&buf).await {
log::warn!("send volume failed: {:?}", err);
}
} else {
log::warn!("socket is none, skip send volume");
}
self.volume_changed_subscriber_handler = Some(handler);
}
pub async fn send_colors(&self, buf: &[u8]) {
let info = self.info.read().await;
if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
@@ -155,12 +195,14 @@ impl Board {
impl Drop for Board {
fn drop(&mut self) {
info!("board drop");
if let Some(handler) = self.listen_handler.take() {
info!("aborting listen handler");
tokio::task::block_in_place(move || {
handler.abort();
});
info!("listen handler aborted");
handler.abort();
}
if let Some(handler) = self.volume_changed_subscriber_handler.take() {
handler.abort();
}
}
}

View File

@@ -7,6 +7,7 @@ use super::DisplaySettingRequest;
pub struct BoardMessageChannels {
pub display_setting_request_sender: Arc<broadcast::Sender<DisplaySettingRequest>>,
pub volume_setting_request_sender: Arc<broadcast::Sender<f32>>,
pub volume_changed_sender: Arc<broadcast::Sender<f32>>,
}
impl BoardMessageChannels {
@@ -23,9 +24,13 @@ impl BoardMessageChannels {
let (volume_setting_request_sender, _) = broadcast::channel(16);
let volume_setting_request_sender = Arc::new(volume_setting_request_sender);
let (volume_changed_sender, _) = broadcast::channel(2);
let volume_changed_sender = Arc::new(volume_changed_sender);
Self {
display_setting_request_sender,
volume_setting_request_sender,
volume_changed_sender,
}
}
}