diff --git a/src-tauri/src/ambient_light/config_manager.rs b/src-tauri/src/ambient_light/config_manager.rs index 179af70..2d5a0eb 100644 --- a/src-tauri/src/ambient_light/config_manager.rs +++ b/src-tauri/src/ambient_light/config_manager.rs @@ -1,7 +1,7 @@ use std::{borrow::BorrowMut, sync::Arc}; use tauri::async_runtime::RwLock; -use tokio::sync::OnceCell; +use tokio::{sync::OnceCell, task::yield_now}; use crate::ambient_light::{config, LedStripConfigGroup}; @@ -9,7 +9,6 @@ use super::{Border, SamplePointMapper, ColorCalibration}; pub struct ConfigManager { config: Arc>, - config_update_receiver: tokio::sync::watch::Receiver, config_update_sender: tokio::sync::watch::Sender, } @@ -22,10 +21,12 @@ impl ConfigManager { let (config_update_sender, config_update_receiver) = tokio::sync::watch::channel(configs.clone()); - config_update_sender.send(configs.clone()).unwrap(); + if let Err(err) = config_update_sender.send(configs.clone()) { + log::error!("Failed to send config update when read config first time: {}", err); + } + drop(config_update_receiver); ConfigManager { config: Arc::new(RwLock::new(configs)), - config_update_receiver, config_update_sender, } }) @@ -46,8 +47,9 @@ impl ConfigManager { self.config_update_sender .send(configs.clone()) .map_err(|e| anyhow::anyhow!("Failed to send config update: {}", e))?; + yield_now().await; - // log::info!("config updated: {:?}", configs); + log::debug!("config updated: {:?}", configs); Ok(()) } @@ -221,7 +223,7 @@ impl ConfigManager { pub fn clone_config_update_receiver( &self, ) -> tokio::sync::watch::Receiver { - self.config_update_receiver.clone() + self.config_update_sender.subscribe() } pub async fn set_color_calibration(&self, color_calibration: ColorCalibration) -> anyhow::Result<()> { diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index ee9eae9..fd04964 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -13,7 +13,7 @@ use ambient_light::{Border, ColorCalibration, LedStripConfig, LedStripConfigGrou use display::{DisplayManager, DisplayState}; use display_info::DisplayInfo; use paris::{error, info, warn}; -use rpc::{BoardInfo, MqttRpc, UdpRpc}; +use rpc::{BoardInfo, UdpRpc}; use screenshot::Screenshot; use screenshot_manager::ScreenshotManager; use serde::{Deserialize, Serialize}; @@ -223,8 +223,6 @@ async fn main() { let led_color_publisher = ambient_light::LedColorsPublisher::global().await; led_color_publisher.start(); - let _mqtt = MqttRpc::global().await; - let _volume = VolumeManager::global().await; tauri::Builder::default() @@ -375,8 +373,7 @@ async fn main() { let app_handle = app.handle().clone(); tokio::spawn(async move { let config_manager = ambient_light::ConfigManager::global().await; - let config_update_receiver = config_manager.clone_config_update_receiver(); - let mut config_update_receiver = config_update_receiver; + let mut config_update_receiver = config_manager.clone_config_update_receiver(); loop { if let Err(err) = config_update_receiver.changed().await { error!("config update receiver changed error: {}", err); diff --git a/src-tauri/src/rpc/board.rs b/src-tauri/src/rpc/board.rs index d74a5db..13fb607 100644 --- a/src-tauri/src/rpc/board.rs +++ b/src-tauri/src/rpc/board.rs @@ -3,7 +3,11 @@ use std::{sync::Arc, time::Duration}; use paris::{error, info, warn}; use tokio::{io, net::UdpSocket, sync::RwLock, task::yield_now, time::timeout}; -use crate::{rpc::DisplaySettingRequest, volume::{VolumeManager, self}}; +use crate::{ + ambient_light::{ConfigManager, LedStripConfig}, + rpc::DisplaySettingRequest, + volume::{self, VolumeManager}, +}; use super::{BoardConnectStatus, BoardInfo, BoardMessageChannels}; @@ -14,6 +18,7 @@ pub struct Board { listen_handler: Option>, volume_changed_subscriber_handler: Option>, state_of_displays_changed_subscriber_handler: Option>, + led_strip_config_changed_subscriber_handler: Option>, } impl Board { @@ -24,6 +29,7 @@ impl Board { listen_handler: None, volume_changed_subscriber_handler: None, state_of_displays_changed_subscriber_handler: None, + led_strip_config_changed_subscriber_handler: None, } } @@ -82,7 +88,8 @@ impl Board { self.listen_handler = Some(handler); self.subscribe_volume_changed().await; - self.state_of_displays_changed().await; + self.subscribe_state_of_displays_changed().await; + self.subscribe_led_strip_config_changed().await; Ok(()) } @@ -95,17 +102,18 @@ impl Board { let handler = tokio::spawn(async move { loop { - let volume: Result = volume_changed_rx.recv().await; + let volume: Result = + volume_changed_rx.recv().await; if let Err(err) = volume { match err { tokio::sync::broadcast::error::RecvError::Closed => { log::error!("volume changed channel closed"); break; - }, + } tokio::sync::broadcast::error::RecvError::Lagged(_) => { log::info!("volume changed channel lagged"); continue; - }, + } } } @@ -144,25 +152,28 @@ impl Board { self.volume_changed_subscriber_handler = Some(handler); } - async fn state_of_displays_changed(&mut self) { + async fn subscribe_state_of_displays_changed(&mut self) { let channel: &BoardMessageChannels = BoardMessageChannels::global().await; let mut state_of_displays_changed_rx = channel.displays_changed_sender.subscribe(); let info = self.info.clone(); let socket = self.socket.clone(); let handler = tokio::spawn(async move { - loop { - let states: Result, tokio::sync::broadcast::error::RecvError> = state_of_displays_changed_rx.recv().await; + loop { + let states: Result< + Vec, + tokio::sync::broadcast::error::RecvError, + > = state_of_displays_changed_rx.recv().await; if let Err(err) = states { match err { tokio::sync::broadcast::error::RecvError::Closed => { log::error!("state of displays changed channel closed"); break; - }, + } tokio::sync::broadcast::error::RecvError::Lagged(_) => { log::info!("state of displays changed channel lagged"); continue; - }, + } } } @@ -187,13 +198,45 @@ impl Board { log::warn!("send state of displays changed failed: {:?}", err); } } - - } + } }); self.state_of_displays_changed_subscriber_handler = Some(handler); } + async fn subscribe_led_strip_config_changed(&mut self) { + let config_manager = ConfigManager::global().await; + let mut led_strip_config_changed_rx = config_manager.clone_config_update_receiver(); + let info = self.info.clone(); + let socket = self.socket.clone(); + + let handler = tokio::spawn(async move { + while led_strip_config_changed_rx.changed().await.is_ok() { + let config = led_strip_config_changed_rx.borrow().clone(); + + let info = info.read().await; + if socket.is_none() || info.connect_status != BoardConnectStatus::Connected { + log::info!("board is not connected, skip send led strip config changed"); + continue; + } + + let socket = socket.as_ref().unwrap(); + + let mut buf = [0u8; 4]; + buf[0] = 5; + buf[1..].copy_from_slice(&config.color_calibration.to_bytes()); + + log::info!("send led strip config changed: {:?}", &buf[..]); + + if let Err(err) = socket.send(&buf).await { + log::warn!("send led strip config changed failed: {:?}", err); + } + } + }); + + self.led_strip_config_changed_subscriber_handler = Some(handler); + } + pub async fn send_colors(&self, buf: &[u8]) { let info = self.info.read().await; if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected { @@ -278,5 +321,8 @@ impl Drop for Board { handler.abort(); } + if let Some(handler) = self.led_strip_config_changed_subscriber_handler.take() { + handler.abort(); + } } } diff --git a/src-tauri/src/rpc/mod.rs b/src-tauri/src/rpc/mod.rs index 1a32233..5602564 100644 --- a/src-tauri/src/rpc/mod.rs +++ b/src-tauri/src/rpc/mod.rs @@ -1,12 +1,10 @@ mod board_info; -mod mqtt; mod udp; mod board; mod display_setting_request; mod channels; pub use board_info::*; -pub use mqtt::*; pub use udp::*; pub use board::*; pub use display_setting_request::*; diff --git a/src-tauri/src/rpc/mqtt.rs b/src-tauri/src/rpc/mqtt.rs deleted file mode 100644 index 4bcd5ed..0000000 --- a/src-tauri/src/rpc/mqtt.rs +++ /dev/null @@ -1,163 +0,0 @@ -use paho_mqtt as mqtt; -use paris::{info, warn}; -use serde_json::json; -use std::time::Duration; -use time::{format_description, OffsetDateTime}; -use tokio::{sync::OnceCell, task}; - -use crate::ambient_light::{ColorCalibration, ConfigManager}; - -const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; -const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop"; -const COLOR_CALIBRATION: &'static str = "display-ambient-light/desktop/color-calibration"; - -pub struct MqttRpc { - client: mqtt::AsyncClient, - // change_display_brightness_tx: broadcast::Sender, - // message_tx: broadcast::Sender, -} - -impl MqttRpc { - pub async fn global() -> &'static Self { - static MQTT_RPC: OnceCell = OnceCell::const_new(); - - MQTT_RPC - .get_or_init(|| async { - let mqtt_rpc = MqttRpc::new().await.unwrap(); - mqtt_rpc.initialize().await.unwrap(); - mqtt_rpc - }) - .await - } - - pub async fn new() -> anyhow::Result { - let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883") - .map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?; - - client.set_connected_callback(|client| { - info!("MQTT server connected."); - - client.subscribe("display-ambient-light/board/#", mqtt::QOS_1); - - client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1); - }); - client.set_connection_lost_callback(|_| { - info!("MQTT server connection lost."); - }); - client.set_disconnected_callback(|_, a1, a2| { - info!("MQTT server disconnected. {:?} {:?}", a1, a2); - }); - - let mut last_will_payload = serde_json::Map::new(); - last_will_payload.insert("message".to_string(), json!("offline")); - last_will_payload.insert( - "time".to_string(), - serde_json::Value::String( - OffsetDateTime::now_utc() - .format(&time::format_description::well_known::iso8601::Iso8601::DEFAULT) - .unwrap() - .to_string(), - ), - ); - - let last_will = mqtt::Message::new( - format!("{}/status", DESKTOP_TOPIC), - serde_json::to_string(&last_will_payload) - .unwrap() - .as_bytes(), - mqtt::QOS_1, - ); - - let connect_options = mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(5)) - .will_message(last_will) - .automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5)) - .finalize(); - - let token = client.connect(connect_options); - - token.await.map_err(|err| { - anyhow::anyhow!( - "can not connect MQTT server. wait for connect token failed. {:?}", - err - ) - })?; - - // let (change_display_brightness_tx, _) = - // broadcast::channel::(16); - // let (message_tx, _) = broadcast::channel::(32); - Ok(Self { client }) - } - - pub async fn initialize(&self) -> anyhow::Result<()> { - self.broadcast_desktop_online(); - Self::publish_color_calibration_worker(); - anyhow::Ok(()) - } - - fn publish_color_calibration_worker() { - tokio::spawn(async move { - let mqtt = Self::global().await; - let config_manager = ConfigManager::global().await; - let mut config_receiver = config_manager.clone_config_update_receiver(); - - let config = config_manager.configs().await; - if let Err(err) = mqtt - .publish_color_calibration(config.color_calibration) - .await - { - warn!("can not publish color calibration. {}", err); - } - - while config_receiver.changed().await.is_ok() { - let config = config_receiver.borrow().clone(); - if let Err(err) = mqtt - .publish_color_calibration(config.color_calibration) - .await - { - warn!("can not publish color calibration. {}", err); - } - } - }); - } - - fn broadcast_desktop_online(&self) { - let client = self.client.to_owned(); - task::spawn(async move { - loop { - match OffsetDateTime::now_utc() - .format(&format_description::well_known::Iso8601::DEFAULT) - { - Ok(now_str) => { - let msg = mqtt::Message::new( - "display-ambient-light/desktop/online", - now_str.as_bytes(), - mqtt::QOS_0, - ); - match client.publish(msg).await { - Ok(_) => {} - Err(error) => { - warn!("can not publish last online time. {}", error) - } - } - } - Err(error) => { - warn!("can not get time for now. {}", error); - } - } - tokio::time::sleep(Duration::from_millis(1000)).await; - } - }); - } - - pub async fn publish_color_calibration(&self, payload: ColorCalibration) -> anyhow::Result<()> { - self.client - .publish(mqtt::Message::new( - COLOR_CALIBRATION, - payload.to_bytes(), - mqtt::QOS_1, - )) - .await - .map_err(|error| anyhow::anyhow!("mqtt publish color calibration failed. {}", error)) - } -}