feat: 改用 udp 向板子发送颜色校准信息。
This commit is contained in:
parent
98d2f7891a
commit
ed72bdfdb1
@ -1,7 +1,7 @@
|
|||||||
use std::{borrow::BorrowMut, sync::Arc};
|
use std::{borrow::BorrowMut, sync::Arc};
|
||||||
|
|
||||||
use tauri::async_runtime::RwLock;
|
use tauri::async_runtime::RwLock;
|
||||||
use tokio::sync::OnceCell;
|
use tokio::{sync::OnceCell, task::yield_now};
|
||||||
|
|
||||||
use crate::ambient_light::{config, LedStripConfigGroup};
|
use crate::ambient_light::{config, LedStripConfigGroup};
|
||||||
|
|
||||||
@ -9,7 +9,6 @@ use super::{Border, SamplePointMapper, ColorCalibration};
|
|||||||
|
|
||||||
pub struct ConfigManager {
|
pub struct ConfigManager {
|
||||||
config: Arc<RwLock<LedStripConfigGroup>>,
|
config: Arc<RwLock<LedStripConfigGroup>>,
|
||||||
config_update_receiver: tokio::sync::watch::Receiver<LedStripConfigGroup>,
|
|
||||||
config_update_sender: tokio::sync::watch::Sender<LedStripConfigGroup>,
|
config_update_sender: tokio::sync::watch::Sender<LedStripConfigGroup>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -22,10 +21,12 @@ impl ConfigManager {
|
|||||||
let (config_update_sender, config_update_receiver) =
|
let (config_update_sender, config_update_receiver) =
|
||||||
tokio::sync::watch::channel(configs.clone());
|
tokio::sync::watch::channel(configs.clone());
|
||||||
|
|
||||||
config_update_sender.send(configs.clone()).unwrap();
|
if let Err(err) = config_update_sender.send(configs.clone()) {
|
||||||
|
log::error!("Failed to send config update when read config first time: {}", err);
|
||||||
|
}
|
||||||
|
drop(config_update_receiver);
|
||||||
ConfigManager {
|
ConfigManager {
|
||||||
config: Arc::new(RwLock::new(configs)),
|
config: Arc::new(RwLock::new(configs)),
|
||||||
config_update_receiver,
|
|
||||||
config_update_sender,
|
config_update_sender,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -46,8 +47,9 @@ impl ConfigManager {
|
|||||||
self.config_update_sender
|
self.config_update_sender
|
||||||
.send(configs.clone())
|
.send(configs.clone())
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to send config update: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to send config update: {}", e))?;
|
||||||
|
yield_now().await;
|
||||||
|
|
||||||
// log::info!("config updated: {:?}", configs);
|
log::debug!("config updated: {:?}", configs);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -221,7 +223,7 @@ impl ConfigManager {
|
|||||||
pub fn clone_config_update_receiver(
|
pub fn clone_config_update_receiver(
|
||||||
&self,
|
&self,
|
||||||
) -> tokio::sync::watch::Receiver<LedStripConfigGroup> {
|
) -> tokio::sync::watch::Receiver<LedStripConfigGroup> {
|
||||||
self.config_update_receiver.clone()
|
self.config_update_sender.subscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_color_calibration(&self, color_calibration: ColorCalibration) -> anyhow::Result<()> {
|
pub async fn set_color_calibration(&self, color_calibration: ColorCalibration) -> anyhow::Result<()> {
|
||||||
|
@ -13,7 +13,7 @@ use ambient_light::{Border, ColorCalibration, LedStripConfig, LedStripConfigGrou
|
|||||||
use display::{DisplayManager, DisplayState};
|
use display::{DisplayManager, DisplayState};
|
||||||
use display_info::DisplayInfo;
|
use display_info::DisplayInfo;
|
||||||
use paris::{error, info, warn};
|
use paris::{error, info, warn};
|
||||||
use rpc::{BoardInfo, MqttRpc, UdpRpc};
|
use rpc::{BoardInfo, UdpRpc};
|
||||||
use screenshot::Screenshot;
|
use screenshot::Screenshot;
|
||||||
use screenshot_manager::ScreenshotManager;
|
use screenshot_manager::ScreenshotManager;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -223,8 +223,6 @@ async fn main() {
|
|||||||
let led_color_publisher = ambient_light::LedColorsPublisher::global().await;
|
let led_color_publisher = ambient_light::LedColorsPublisher::global().await;
|
||||||
led_color_publisher.start();
|
led_color_publisher.start();
|
||||||
|
|
||||||
let _mqtt = MqttRpc::global().await;
|
|
||||||
|
|
||||||
let _volume = VolumeManager::global().await;
|
let _volume = VolumeManager::global().await;
|
||||||
|
|
||||||
tauri::Builder::default()
|
tauri::Builder::default()
|
||||||
@ -375,8 +373,7 @@ async fn main() {
|
|||||||
let app_handle = app.handle().clone();
|
let app_handle = app.handle().clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let config_manager = ambient_light::ConfigManager::global().await;
|
let config_manager = ambient_light::ConfigManager::global().await;
|
||||||
let config_update_receiver = config_manager.clone_config_update_receiver();
|
let mut config_update_receiver = config_manager.clone_config_update_receiver();
|
||||||
let mut config_update_receiver = config_update_receiver;
|
|
||||||
loop {
|
loop {
|
||||||
if let Err(err) = config_update_receiver.changed().await {
|
if let Err(err) = config_update_receiver.changed().await {
|
||||||
error!("config update receiver changed error: {}", err);
|
error!("config update receiver changed error: {}", err);
|
||||||
|
@ -3,7 +3,11 @@ use std::{sync::Arc, time::Duration};
|
|||||||
use paris::{error, info, warn};
|
use paris::{error, info, warn};
|
||||||
use tokio::{io, net::UdpSocket, sync::RwLock, task::yield_now, time::timeout};
|
use tokio::{io, net::UdpSocket, sync::RwLock, task::yield_now, time::timeout};
|
||||||
|
|
||||||
use crate::{rpc::DisplaySettingRequest, volume::{VolumeManager, self}};
|
use crate::{
|
||||||
|
ambient_light::{ConfigManager, LedStripConfig},
|
||||||
|
rpc::DisplaySettingRequest,
|
||||||
|
volume::{self, VolumeManager},
|
||||||
|
};
|
||||||
|
|
||||||
use super::{BoardConnectStatus, BoardInfo, BoardMessageChannels};
|
use super::{BoardConnectStatus, BoardInfo, BoardMessageChannels};
|
||||||
|
|
||||||
@ -14,6 +18,7 @@ pub struct Board {
|
|||||||
listen_handler: Option<tokio::task::JoinHandle<()>>,
|
listen_handler: Option<tokio::task::JoinHandle<()>>,
|
||||||
volume_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
|
volume_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
|
||||||
state_of_displays_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
|
state_of_displays_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
led_strip_config_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Board {
|
impl Board {
|
||||||
@ -24,6 +29,7 @@ impl Board {
|
|||||||
listen_handler: None,
|
listen_handler: None,
|
||||||
volume_changed_subscriber_handler: None,
|
volume_changed_subscriber_handler: None,
|
||||||
state_of_displays_changed_subscriber_handler: None,
|
state_of_displays_changed_subscriber_handler: None,
|
||||||
|
led_strip_config_changed_subscriber_handler: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,7 +88,8 @@ impl Board {
|
|||||||
self.listen_handler = Some(handler);
|
self.listen_handler = Some(handler);
|
||||||
|
|
||||||
self.subscribe_volume_changed().await;
|
self.subscribe_volume_changed().await;
|
||||||
self.state_of_displays_changed().await;
|
self.subscribe_state_of_displays_changed().await;
|
||||||
|
self.subscribe_led_strip_config_changed().await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -95,17 +102,18 @@ impl Board {
|
|||||||
|
|
||||||
let handler = tokio::spawn(async move {
|
let handler = tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
let volume: Result<f32, tokio::sync::broadcast::error::RecvError> = volume_changed_rx.recv().await;
|
let volume: Result<f32, tokio::sync::broadcast::error::RecvError> =
|
||||||
|
volume_changed_rx.recv().await;
|
||||||
if let Err(err) = volume {
|
if let Err(err) = volume {
|
||||||
match err {
|
match err {
|
||||||
tokio::sync::broadcast::error::RecvError::Closed => {
|
tokio::sync::broadcast::error::RecvError::Closed => {
|
||||||
log::error!("volume changed channel closed");
|
log::error!("volume changed channel closed");
|
||||||
break;
|
break;
|
||||||
},
|
}
|
||||||
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
|
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
|
||||||
log::info!("volume changed channel lagged");
|
log::info!("volume changed channel lagged");
|
||||||
continue;
|
continue;
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,25 +152,28 @@ impl Board {
|
|||||||
self.volume_changed_subscriber_handler = Some(handler);
|
self.volume_changed_subscriber_handler = Some(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn state_of_displays_changed(&mut self) {
|
async fn subscribe_state_of_displays_changed(&mut self) {
|
||||||
let channel: &BoardMessageChannels = BoardMessageChannels::global().await;
|
let channel: &BoardMessageChannels = BoardMessageChannels::global().await;
|
||||||
let mut state_of_displays_changed_rx = channel.displays_changed_sender.subscribe();
|
let mut state_of_displays_changed_rx = channel.displays_changed_sender.subscribe();
|
||||||
let info = self.info.clone();
|
let info = self.info.clone();
|
||||||
let socket = self.socket.clone();
|
let socket = self.socket.clone();
|
||||||
|
|
||||||
let handler = tokio::spawn(async move {
|
let handler = tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
let states: Result<Vec<crate::display::DisplayState>, tokio::sync::broadcast::error::RecvError> = state_of_displays_changed_rx.recv().await;
|
let states: Result<
|
||||||
|
Vec<crate::display::DisplayState>,
|
||||||
|
tokio::sync::broadcast::error::RecvError,
|
||||||
|
> = state_of_displays_changed_rx.recv().await;
|
||||||
if let Err(err) = states {
|
if let Err(err) = states {
|
||||||
match err {
|
match err {
|
||||||
tokio::sync::broadcast::error::RecvError::Closed => {
|
tokio::sync::broadcast::error::RecvError::Closed => {
|
||||||
log::error!("state of displays changed channel closed");
|
log::error!("state of displays changed channel closed");
|
||||||
break;
|
break;
|
||||||
},
|
}
|
||||||
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
|
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
|
||||||
log::info!("state of displays changed channel lagged");
|
log::info!("state of displays changed channel lagged");
|
||||||
continue;
|
continue;
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,13 +198,45 @@ impl Board {
|
|||||||
log::warn!("send state of displays changed failed: {:?}", err);
|
log::warn!("send state of displays changed failed: {:?}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
self.state_of_displays_changed_subscriber_handler = Some(handler);
|
self.state_of_displays_changed_subscriber_handler = Some(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn subscribe_led_strip_config_changed(&mut self) {
|
||||||
|
let config_manager = ConfigManager::global().await;
|
||||||
|
let mut led_strip_config_changed_rx = config_manager.clone_config_update_receiver();
|
||||||
|
let info = self.info.clone();
|
||||||
|
let socket = self.socket.clone();
|
||||||
|
|
||||||
|
let handler = tokio::spawn(async move {
|
||||||
|
while led_strip_config_changed_rx.changed().await.is_ok() {
|
||||||
|
let config = led_strip_config_changed_rx.borrow().clone();
|
||||||
|
|
||||||
|
let info = info.read().await;
|
||||||
|
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
|
||||||
|
log::info!("board is not connected, skip send led strip config changed");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let socket = socket.as_ref().unwrap();
|
||||||
|
|
||||||
|
let mut buf = [0u8; 4];
|
||||||
|
buf[0] = 5;
|
||||||
|
buf[1..].copy_from_slice(&config.color_calibration.to_bytes());
|
||||||
|
|
||||||
|
log::info!("send led strip config changed: {:?}", &buf[..]);
|
||||||
|
|
||||||
|
if let Err(err) = socket.send(&buf).await {
|
||||||
|
log::warn!("send led strip config changed failed: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
self.led_strip_config_changed_subscriber_handler = Some(handler);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn send_colors(&self, buf: &[u8]) {
|
pub async fn send_colors(&self, buf: &[u8]) {
|
||||||
let info = self.info.read().await;
|
let info = self.info.read().await;
|
||||||
if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
|
if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
|
||||||
@ -278,5 +321,8 @@ impl Drop for Board {
|
|||||||
handler.abort();
|
handler.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(handler) = self.led_strip_config_changed_subscriber_handler.take() {
|
||||||
|
handler.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,10 @@
|
|||||||
mod board_info;
|
mod board_info;
|
||||||
mod mqtt;
|
|
||||||
mod udp;
|
mod udp;
|
||||||
mod board;
|
mod board;
|
||||||
mod display_setting_request;
|
mod display_setting_request;
|
||||||
mod channels;
|
mod channels;
|
||||||
|
|
||||||
pub use board_info::*;
|
pub use board_info::*;
|
||||||
pub use mqtt::*;
|
|
||||||
pub use udp::*;
|
pub use udp::*;
|
||||||
pub use board::*;
|
pub use board::*;
|
||||||
pub use display_setting_request::*;
|
pub use display_setting_request::*;
|
||||||
|
@ -1,163 +0,0 @@
|
|||||||
use paho_mqtt as mqtt;
|
|
||||||
use paris::{info, warn};
|
|
||||||
use serde_json::json;
|
|
||||||
use std::time::Duration;
|
|
||||||
use time::{format_description, OffsetDateTime};
|
|
||||||
use tokio::{sync::OnceCell, task};
|
|
||||||
|
|
||||||
use crate::ambient_light::{ColorCalibration, ConfigManager};
|
|
||||||
|
|
||||||
const DISPLAY_TOPIC: &'static str = "display-ambient-light/display";
|
|
||||||
const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop";
|
|
||||||
const COLOR_CALIBRATION: &'static str = "display-ambient-light/desktop/color-calibration";
|
|
||||||
|
|
||||||
pub struct MqttRpc {
|
|
||||||
client: mqtt::AsyncClient,
|
|
||||||
// change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>,
|
|
||||||
// message_tx: broadcast::Sender<models::CmdMqMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MqttRpc {
|
|
||||||
pub async fn global() -> &'static Self {
|
|
||||||
static MQTT_RPC: OnceCell<MqttRpc> = OnceCell::const_new();
|
|
||||||
|
|
||||||
MQTT_RPC
|
|
||||||
.get_or_init(|| async {
|
|
||||||
let mqtt_rpc = MqttRpc::new().await.unwrap();
|
|
||||||
mqtt_rpc.initialize().await.unwrap();
|
|
||||||
mqtt_rpc
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
|
||||||
let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883")
|
|
||||||
.map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?;
|
|
||||||
|
|
||||||
client.set_connected_callback(|client| {
|
|
||||||
info!("MQTT server connected.");
|
|
||||||
|
|
||||||
client.subscribe("display-ambient-light/board/#", mqtt::QOS_1);
|
|
||||||
|
|
||||||
client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1);
|
|
||||||
});
|
|
||||||
client.set_connection_lost_callback(|_| {
|
|
||||||
info!("MQTT server connection lost.");
|
|
||||||
});
|
|
||||||
client.set_disconnected_callback(|_, a1, a2| {
|
|
||||||
info!("MQTT server disconnected. {:?} {:?}", a1, a2);
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut last_will_payload = serde_json::Map::new();
|
|
||||||
last_will_payload.insert("message".to_string(), json!("offline"));
|
|
||||||
last_will_payload.insert(
|
|
||||||
"time".to_string(),
|
|
||||||
serde_json::Value::String(
|
|
||||||
OffsetDateTime::now_utc()
|
|
||||||
.format(&time::format_description::well_known::iso8601::Iso8601::DEFAULT)
|
|
||||||
.unwrap()
|
|
||||||
.to_string(),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
let last_will = mqtt::Message::new(
|
|
||||||
format!("{}/status", DESKTOP_TOPIC),
|
|
||||||
serde_json::to_string(&last_will_payload)
|
|
||||||
.unwrap()
|
|
||||||
.as_bytes(),
|
|
||||||
mqtt::QOS_1,
|
|
||||||
);
|
|
||||||
|
|
||||||
let connect_options = mqtt::ConnectOptionsBuilder::new()
|
|
||||||
.keep_alive_interval(Duration::from_secs(5))
|
|
||||||
.will_message(last_will)
|
|
||||||
.automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5))
|
|
||||||
.finalize();
|
|
||||||
|
|
||||||
let token = client.connect(connect_options);
|
|
||||||
|
|
||||||
token.await.map_err(|err| {
|
|
||||||
anyhow::anyhow!(
|
|
||||||
"can not connect MQTT server. wait for connect token failed. {:?}",
|
|
||||||
err
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// let (change_display_brightness_tx, _) =
|
|
||||||
// broadcast::channel::<display::DisplayBrightness>(16);
|
|
||||||
// let (message_tx, _) = broadcast::channel::<models::CmdMqMessage>(32);
|
|
||||||
Ok(Self { client })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn initialize(&self) -> anyhow::Result<()> {
|
|
||||||
self.broadcast_desktop_online();
|
|
||||||
Self::publish_color_calibration_worker();
|
|
||||||
anyhow::Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn publish_color_calibration_worker() {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mqtt = Self::global().await;
|
|
||||||
let config_manager = ConfigManager::global().await;
|
|
||||||
let mut config_receiver = config_manager.clone_config_update_receiver();
|
|
||||||
|
|
||||||
let config = config_manager.configs().await;
|
|
||||||
if let Err(err) = mqtt
|
|
||||||
.publish_color_calibration(config.color_calibration)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
warn!("can not publish color calibration. {}", err);
|
|
||||||
}
|
|
||||||
|
|
||||||
while config_receiver.changed().await.is_ok() {
|
|
||||||
let config = config_receiver.borrow().clone();
|
|
||||||
if let Err(err) = mqtt
|
|
||||||
.publish_color_calibration(config.color_calibration)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
warn!("can not publish color calibration. {}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn broadcast_desktop_online(&self) {
|
|
||||||
let client = self.client.to_owned();
|
|
||||||
task::spawn(async move {
|
|
||||||
loop {
|
|
||||||
match OffsetDateTime::now_utc()
|
|
||||||
.format(&format_description::well_known::Iso8601::DEFAULT)
|
|
||||||
{
|
|
||||||
Ok(now_str) => {
|
|
||||||
let msg = mqtt::Message::new(
|
|
||||||
"display-ambient-light/desktop/online",
|
|
||||||
now_str.as_bytes(),
|
|
||||||
mqtt::QOS_0,
|
|
||||||
);
|
|
||||||
match client.publish(msg).await {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(error) => {
|
|
||||||
warn!("can not publish last online time. {}", error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
warn!("can not get time for now. {}", error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn publish_color_calibration(&self, payload: ColorCalibration) -> anyhow::Result<()> {
|
|
||||||
self.client
|
|
||||||
.publish(mqtt::Message::new(
|
|
||||||
COLOR_CALIBRATION,
|
|
||||||
payload.to_bytes(),
|
|
||||||
mqtt::QOS_1,
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
.map_err(|error| anyhow::anyhow!("mqtt publish color calibration failed. {}", error))
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user