From 6e65ef1a4d260b21d6b897060ccdf025f60d5869 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 19 Feb 2023 10:13:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=80=9A=E7=9F=A5=E5=BD=93=E5=89=8D?= =?UTF-8?q?=E4=BA=AE=E5=BA=A6=EF=BC=8C=E9=87=8D=E6=9E=84=20mqtt=20?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81=E3=80=82#5.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/Cargo.lock | 42 +++++ src-tauri/Cargo.toml | 1 + src-tauri/src/display/manager.rs | 10 +- src-tauri/src/main.rs | 7 +- src-tauri/src/models/config_display_cmd.rs | 10 + src-tauri/src/models/control_value.rs | 8 + src-tauri/src/models/mod.rs | 8 + src-tauri/src/models/mq_message.rs | 11 ++ src-tauri/src/rpc/manager.rs | 9 +- src-tauri/src/rpc/mqtt.rs | 204 +++++++++++++++------ 10 files changed, 246 insertions(+), 64 deletions(-) create mode 100644 src-tauri/src/models/config_display_cmd.rs create mode 100644 src-tauri/src/models/control_value.rs create mode 100644 src-tauri/src/models/mod.rs create mode 100644 src-tauri/src/models/mq_message.rs diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 54a23b7..57d457d 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -442,6 +442,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cmake" +version = "0.1.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db34956e100b30725f2eb215f90d4871051239535632f84fea3bc92722c66b7c" +dependencies = [ + "cc", +] + [[package]] name = "cocoa" version = "0.24.1" @@ -855,6 +864,7 @@ dependencies = [ "macos-app-nap", "mdns", "once_cell", + "paho-mqtt", "paris", "redb", "rumqttc", @@ -1147,6 +1157,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.25" @@ -2401,6 +2417,32 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "paho-mqtt" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3934b32a8321d0f64dee17c97ea1ffbf78d75f850ff13f7dd33c6e5438e32bf" +dependencies = [ + "async-channel", + "crossbeam-channel", + "futures", + "futures-timer", + "libc", + "log", + "paho-mqtt-sys", + "thiserror", +] + +[[package]] +name = "paho-mqtt-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92767091deda553bdb716820e5c168e2fbeb48c2f7f27a3d459b84cd53d680c" +dependencies = [ + "cmake", + "openssl-sys", +] + [[package]] name = "pango" version = "0.15.10" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 8cfb640..0ea7d7d 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -38,6 +38,7 @@ mdns = "3.0.0" macos-app-nap = "0.0.1" ddc-hi = "0.4.1" redb = "0.13.0" +paho-mqtt = "0.12.0" [features] # by default Tauri runs in production mode diff --git a/src-tauri/src/display/manager.rs b/src-tauri/src/display/manager.rs index b793cf5..88b789b 100644 --- a/src-tauri/src/display/manager.rs +++ b/src-tauri/src/display/manager.rs @@ -12,7 +12,7 @@ use tauri::async_runtime::Mutex; use tokio::sync::{broadcast, OwnedMutexGuard}; use tracing::warn; -use crate::{display::Brightness, rpc}; +use crate::{display::Brightness, models, rpc}; use super::{display_config::DisplayConfig, DisplayBrightness}; use ddc_hi::Ddc; @@ -154,6 +154,14 @@ impl Manager { .handle .set_vcp_feature(0x10, target as u16) .map_err(|err| anyhow::anyhow!("can not set brightness. {:?}", err))?; + + let rpc = rpc::Manager::global().await; + + rpc.publish_desktop_cmd(models::MqMessage::Brightness(models::ConfigDisplayCmd { + display_index: config.id, + value: models::ControlValue::Absolute(config.brightness), + })) + .await; } Err(err) => { info!( diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 5b2c499..24cbdab 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -9,6 +9,7 @@ mod db; mod display; mod picker; mod rpc; +mod models; use crate::core::AmbientLightMode; use crate::core::CoreManager; @@ -94,8 +95,10 @@ async fn play_mode(target_mode: AmbientLightMode) { #[tokio::main] async fn main() { - let displayManager = display::Manager::global(); - tokio::spawn(displayManager.subscribe_display_brightness()); + let display_manager = display::Manager::global(); + tokio::spawn(display_manager.subscribe_display_brightness()); + let rpc_manager = rpc::Manager::global().await; + tokio::spawn(rpc_manager.listen()); tauri::Builder::default() .invoke_handler(tauri::generate_handler![ diff --git a/src-tauri/src/models/config_display_cmd.rs b/src-tauri/src/models/config_display_cmd.rs new file mode 100644 index 0000000..cd5dade --- /dev/null +++ b/src-tauri/src/models/config_display_cmd.rs @@ -0,0 +1,10 @@ +use serde::{Serialize, Deserialize}; + +use super::control_value::ControlValue; + + +#[derive(Clone, Copy, Serialize, Deserialize, Debug)] +pub struct ConfigDisplayCmd { + pub display_index: usize, + pub value: T, +} diff --git a/src-tauri/src/models/control_value.rs b/src-tauri/src/models/control_value.rs new file mode 100644 index 0000000..f06236b --- /dev/null +++ b/src-tauri/src/models/control_value.rs @@ -0,0 +1,8 @@ +use serde::{Serialize, Deserialize}; + + +#[derive(Clone, Copy, Serialize, Deserialize, Debug)] +pub enum ControlValue { + Absolute(AT), + Relative(RT), +} \ No newline at end of file diff --git a/src-tauri/src/models/mod.rs b/src-tauri/src/models/mod.rs new file mode 100644 index 0000000..cf8070d --- /dev/null +++ b/src-tauri/src/models/mod.rs @@ -0,0 +1,8 @@ +mod control_value; +mod mq_message; +mod config_display_cmd; + +pub use control_value::*; +pub use mq_message::*; +pub use config_display_cmd::*; + diff --git a/src-tauri/src/models/mq_message.rs b/src-tauri/src/models/mq_message.rs new file mode 100644 index 0000000..794ba41 --- /dev/null +++ b/src-tauri/src/models/mq_message.rs @@ -0,0 +1,11 @@ +use serde::{Serialize, Deserialize}; + +use super::ConfigDisplayCmd; + + +#[derive(Clone, Copy, Serialize, Deserialize, Debug)] +pub enum MqMessage { + Brightness(ConfigDisplayCmd), + Contrast(ConfigDisplayCmd), + PresetMode(ConfigDisplayCmd), +} \ No newline at end of file diff --git a/src-tauri/src/rpc/manager.rs b/src-tauri/src/rpc/manager.rs index cc8f0db..9e7aea9 100644 --- a/src-tauri/src/rpc/manager.rs +++ b/src-tauri/src/rpc/manager.rs @@ -1,7 +1,7 @@ use paris::error; use tokio::sync::{broadcast, OnceCell}; -use crate::{display, picker::led_color::LedColor}; +use crate::{display, picker::led_color::LedColor, models}; use super::mqtt::MqttRpc; @@ -32,6 +32,10 @@ impl Manager { } } + pub async fn listen(&self) { + self.client.listen().await + } + pub async fn publish_led_colors(&self, colors: &Vec) -> anyhow::Result<()> { let payload = colors .iter() @@ -45,6 +49,9 @@ impl Manager { pub async fn publish_led_sub_pixels(&self, payload: Vec) -> anyhow::Result<()> { self.client.publish_led_sub_pixels(payload).await } + pub async fn publish_desktop_cmd(&self, msg: models::MqMessage) -> anyhow::Result<()> { + self.client.publish_desktop_cmd(msg).await + } pub fn client(&self) -> &MqttRpc { &self.client diff --git a/src-tauri/src/rpc/mqtt.rs b/src-tauri/src/rpc/mqtt.rs index 72697db..bd56ecb 100644 --- a/src-tauri/src/rpc/mqtt.rs +++ b/src-tauri/src/rpc/mqtt.rs @@ -1,88 +1,157 @@ -use crate::display; +use crate::{display, models}; use image::EncodableLayout; -use paris::warn; -use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; -use std::time::Duration; +use paris::{warn, info, error}; +use rumqttc::{ + AsyncClient, ConnectReturnCode, Event, EventLoop, Incoming, MqttOptions, Outgoing, QoS, +}; +use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration}; +use tauri::async_runtime::{Mutex, TokioJoinHandle}; use time::{format_description, OffsetDateTime}; -use tokio::{sync::broadcast, task}; +use tokio::{sync::broadcast, task, time::sleep}; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; +const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd"; +const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd"; pub struct MqttRpc { client: AsyncClient, change_display_brightness_tx: broadcast::Sender, + message_tx: broadcast::Sender, + eventloop: Arc>, } impl MqttRpc { pub fn new() -> Self { let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883); options.set_keep_alive(Duration::from_secs(5)); + options.set_clean_session(false); let (client, mut eventloop) = AsyncClient::new(options, 10); let (change_display_brightness_tx, _) = broadcast::channel::(16); - - let change_display_brightness_tx2 = change_display_brightness_tx.clone(); - - task::spawn(async move { - loop { - match eventloop.poll().await { - Ok(notification) => { - let handled = || -> anyhow::Result<()> { - // println!("MQTT notification = {:?}", notification); - if let Event::Incoming(notification) = notification { - if let Incoming::Publish(notification) = notification { - match notification.topic.as_str() { - DISPLAY_BRIGHTNESS_TOPIC => { - let payload_text = String::from_utf8( - notification.payload.as_bytes().to_owned(), - ) - .map_err(|err| { - anyhow::anyhow!("can not parse json. {:?}", err) - })?; - let display_brightness: display::DisplayBrightness = - serde_json::from_str(payload_text.as_str()) - .map_err(|err| { - anyhow::anyhow!( - "can not deserialize display brightness. {:?}", - err - ) - })?; - change_display_brightness_tx2 - .send(display_brightness) - .map_err(|err| { - anyhow::anyhow!( - "can not broadcast display brightness. {:?}", - err - ) - })?; - } - &_ => {} - }; - } - } - Ok(()) - }; - if let Err(err) = handled() { - warn!("handle notification was failed. Error: {:?}", err); - } - } - Err(err) => { - println!("MQTT Error Event = {:?}", err); - } - } - } - }); + let (message_tx, _) = broadcast::channel::(32); Self { client, change_display_brightness_tx, + message_tx, + eventloop: Arc::new(Mutex::new(eventloop)), } } - pub async fn initialize(&mut self) -> anyhow::Result<()> { + pub async fn listen(&self) { + let change_display_brightness_tx2 = self.change_display_brightness_tx.clone(); + let message_tx_cloned = self.message_tx.clone(); + + let mut eventloop = self.eventloop.clone().lock_owned().await; + + loop { + match eventloop.poll().await { + Ok(notification) => { + if let Event::Incoming(notification) = notification { + if let Incoming::Publish(notification) = notification { + match notification.topic.as_str() { + DISPLAY_BRIGHTNESS_TOPIC => { + let payload_text = String::from_utf8( + notification.payload.as_bytes().to_owned(), + ); + match payload_text { + Ok(payload_text) => { + let display_brightness: Result< + display::DisplayBrightness, + _, + > = serde_json::from_str(payload_text.as_str()); + match display_brightness { + Ok(display_brightness) => { + match change_display_brightness_tx2 + .send(display_brightness) + { + Ok(_) => {} + Err(err) => { + warn!( + "can not broadcast display brightness. {:?}", + err + ); + } + }; + } + Err(err) => { + warn!( + "can not deserialize display brightness. {:?}", + err + ); + } + } + } + Err(err) => { + warn!( + "can not decode display brightness message. {:?}", + err + ); + } + }; + } + BOARD_SEND_CMD => { + let payload_text = String::from_utf8( + notification.payload.as_bytes().to_owned(), + ); + match payload_text { + Ok(payload_text) => { + let message: Result = + serde_json::from_str(payload_text.as_str()); + match message { + Ok(message) => { + match message_tx_cloned.send(message) { + Ok(_) => {} + Err(err) => { + warn!( + "can not broadcast mq message. {:?}", + err + ); + } + }; + } + Err(err) => { + warn!( + "can not deserialize mq message. {:?}", + err + ); + } + } + } + Err(err) => { + warn!("can not decode mq message message. {:?}", err); + } + }; + } + &_ => {} + }; + } else if let Incoming::ConnAck(connAck) = notification { + if connAck.code == ConnectReturnCode::Success { + match self.initialize().await { + Ok(_) => { + info!("resubscribe topics!"); + } + Err(err) => { + info!("resubscribe topics failed! {:?}", err); + } + }; + } + } else if let Incoming::Disconnect = notification { + error!("MQTT Disconnected!"); + } + } + } + Err(err) => { + println!("MQTT Error Event = {:?}", err); + } + } + } + } + + pub async fn initialize(&self) -> anyhow::Result<()> { self.subscribe_board().await?; self.subscribe_display().await?; self.broadcast_desktop_online(); @@ -102,7 +171,7 @@ impl MqttRpc { .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) } - fn broadcast_desktop_online(&mut self) { + fn broadcast_desktop_online(&self) { let client = self.client.to_owned(); task::spawn(async move { loop { @@ -146,7 +215,22 @@ impl MqttRpc { .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) } - pub fn subscribe_change_display_brightness_rx(&self) -> broadcast::Receiver { + pub fn subscribe_change_display_brightness_rx( + &self, + ) -> broadcast::Receiver { self.change_display_brightness_tx.subscribe() } + pub async fn publish_desktop_cmd(&self, msg: models::MqMessage) -> anyhow::Result<()> { + let str = serde_json::to_string(&msg) + .map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?; + self.client + .publish( + DESKTOP_SEND_CMD, + rumqttc::QoS::AtLeastOnce, + false, + str.as_bytes(), + ) + .await + .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) + } }