From e09b93432c0c1410cfb2fbe947b3801267523374 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 19 Feb 2023 16:14:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20MQTT=20client=20=E6=94=B9=E4=B8=BA=20pa?= =?UTF-8?q?ho-mqtt.=20#5.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/Cargo.lock | 128 +---------------- src-tauri/Cargo.toml | 1 - src-tauri/src/rpc/manager.rs | 14 +- src-tauri/src/rpc/mqtt.rs | 267 ++++++++++++++++++----------------- 4 files changed, 145 insertions(+), 265 deletions(-) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 57d457d..f8a7664 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -867,7 +867,6 @@ dependencies = [ "paho-mqtt", "paris", "redb", - "rumqttc", "scrap", "serde", "serde_json", @@ -1028,7 +1027,7 @@ dependencies = [ "futures-sink", "nanorand", "pin-project", - "spin 0.9.4", + "spin", ] [[package]] @@ -2707,12 +2706,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "pollster" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7" - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -3019,40 +3012,6 @@ dependencies = [ "windows 0.37.0", ] -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin 0.5.2", - "untrusted", - "web-sys", - "winapi 0.3.9", -] - -[[package]] -name = "rumqttc" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "499b7ab08ffa5a722958b6ce1b7c0270bea30909f589d12c5ec3a051afe423fc" -dependencies = [ - "bytes", - "flume", - "futures", - "http", - "log", - "pollster", - "rustls-native-certs", - "rustls-pemfile 0.3.0", - "thiserror", - "tokio", - "tokio-rustls", -] - [[package]] name = "rustc_version" version = "0.3.3" @@ -3071,48 +3030,6 @@ dependencies = [ "semver 1.0.16", ] -[[package]] -name = "rustls" -version = "0.20.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" -dependencies = [ - "log", - "ring", - "sct", - "webpki", -] - -[[package]] -name = "rustls-native-certs" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" -dependencies = [ - "openssl-probe", - "rustls-pemfile 1.0.1", - "schannel", - "security-framework", -] - -[[package]] -name = "rustls-pemfile" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" -dependencies = [ - "base64", -] - -[[package]] -name = "rustls-pemfile" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" -dependencies = [ - "base64", -] - [[package]] name = "rustversion" version = "1.0.11" @@ -3180,16 +3097,6 @@ dependencies = [ "winapi 0.2.8", ] -[[package]] -name = "sct" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "security-framework" version = "2.7.0" @@ -3488,12 +3395,6 @@ dependencies = [ "system-deps 5.0.0", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "spin" version = "0.9.4" @@ -4033,17 +3934,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls", - "tokio", - "webpki", -] - [[package]] name = "toml" version = "0.5.10" @@ -4185,12 +4075,6 @@ version = "0.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74c1aa4511c38276c548406f0b1f5f8b793f000cfb51e18f278a102abd057e81" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "url" version = "2.3.1" @@ -4432,16 +4316,6 @@ dependencies = [ "libwebp-sys", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webview2-com" version = "0.19.1" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 0ea7d7d..08df5b8 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -28,7 +28,6 @@ tokio = { version = "1.22.0", features = ["full"] } tracing = "0.1.37" tracing-subscriber = "0.3.16" hex = "0.4.3" -rumqttc = "0.17.0" time = { version = "0.3.17", features = ["formatting"] } color_space = "0.5.3" futures = "0.3.25" diff --git a/src-tauri/src/rpc/manager.rs b/src-tauri/src/rpc/manager.rs index 9e7aea9..ad2317e 100644 --- a/src-tauri/src/rpc/manager.rs +++ b/src-tauri/src/rpc/manager.rs @@ -1,7 +1,7 @@ use paris::error; use tokio::sync::{broadcast, OnceCell}; -use crate::{display, picker::led_color::LedColor, models}; +use crate::{display, models, picker::led_color::LedColor}; use super::mqtt::MqttRpc; @@ -14,11 +14,13 @@ impl Manager { pub async fn global() -> &'static Self { static RPC_MANAGER: OnceCell = OnceCell::const_new(); - RPC_MANAGER.get_or_init(|| Manager::new()).await + RPC_MANAGER + .get_or_init(|| async { Manager::new().await.unwrap() }) + .await } - pub async fn new() -> Self { - let mut mqtt = MqttRpc::new(); + pub async fn new() -> anyhow::Result { + let mqtt = MqttRpc::new().await?; let initialized = match mqtt.initialize().await { Ok(_) => true, Err(err) => { @@ -26,10 +28,10 @@ impl Manager { false } }; - Self { + Ok(Self { client: mqtt, initialized, - } + }) } pub async fn listen(&self) { diff --git a/src-tauri/src/rpc/mqtt.rs b/src-tauri/src/rpc/mqtt.rs index bd56ecb..02bf836 100644 --- a/src-tauri/src/rpc/mqtt.rs +++ b/src-tauri/src/rpc/mqtt.rs @@ -1,174 +1,184 @@ use crate::{display, models}; -use image::EncodableLayout; -use paris::{warn, info, error}; -use rumqttc::{ - AsyncClient, ConnectReturnCode, Event, EventLoop, Incoming, MqttOptions, Outgoing, QoS, -}; +use futures::StreamExt; +use paho_mqtt as mqtt; +use paris::{error, info, warn}; +use serde_json::json; use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration}; use tauri::async_runtime::{Mutex, TokioJoinHandle}; use time::{format_description, OffsetDateTime}; use tokio::{sync::broadcast, task, time::sleep}; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; +const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop"; const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd"; const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd"; pub struct MqttRpc { - client: AsyncClient, + client: mqtt::AsyncClient, change_display_brightness_tx: broadcast::Sender, message_tx: broadcast::Sender, - eventloop: Arc>, } impl MqttRpc { - pub fn new() -> Self { - let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883); - options.set_keep_alive(Duration::from_secs(5)); - options.set_clean_session(false); + pub async fn new() -> anyhow::Result { + let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883") + .map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?; - let (client, mut eventloop) = AsyncClient::new(options, 10); + client.set_connected_callback(|client| { + info!("MQTT server connected."); + + client.subscribe("display-ambient-light/board/#", mqtt::QOS_1); + + client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1); + }); + client.set_connection_lost_callback(|client| { + info!("MQTT server connection lost."); + }); + client.set_disconnected_callback(|_, a1, a2| { + info!("MQTT server disconnected. {:?} {:?}", a1, a2); + }); + + let mut last_will_payload = serde_json::Map::new(); + last_will_payload.insert("message".to_string(), json!("offline")); + last_will_payload.insert( + "time".to_string(), + serde_json::Value::String( + OffsetDateTime::now_utc() + .format( + &format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second]") + .unwrap(), + ) + .unwrap() + .to_string(), + ), + ); + + let last_will = mqtt::Message::new( + format!("{}/status", DESKTOP_TOPIC), + serde_json::to_string(&last_will_payload) + .unwrap() + .as_bytes(), + mqtt::QOS_1, + ); + + let connect_options = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(5)) + .automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5)) + .finalize(); + + let token = client.connect(connect_options); + + token.await.map_err(|err| { + anyhow::anyhow!( + "can not connect MQTT server. wait for connect token failed. {:?}", + err + ) + })?; let (change_display_brightness_tx, _) = broadcast::channel::(16); let (message_tx, _) = broadcast::channel::(32); - - Self { + Ok(Self { client, change_display_brightness_tx, message_tx, - eventloop: Arc::new(Mutex::new(eventloop)), - } + }) } pub async fn listen(&self) { let change_display_brightness_tx2 = self.change_display_brightness_tx.clone(); let message_tx_cloned = self.message_tx.clone(); - let mut eventloop = self.eventloop.clone().lock_owned().await; + let mut stream = self.client.to_owned().get_stream(100); - loop { - match eventloop.poll().await { - Ok(notification) => { - if let Event::Incoming(notification) = notification { - if let Incoming::Publish(notification) = notification { - match notification.topic.as_str() { - DISPLAY_BRIGHTNESS_TOPIC => { - let payload_text = String::from_utf8( - notification.payload.as_bytes().to_owned(), - ); - match payload_text { - Ok(payload_text) => { - let display_brightness: Result< - display::DisplayBrightness, - _, - > = serde_json::from_str(payload_text.as_str()); - match display_brightness { - Ok(display_brightness) => { - match change_display_brightness_tx2 - .send(display_brightness) - { - Ok(_) => {} - Err(err) => { - warn!( - "can not broadcast display brightness. {:?}", - err - ); - } - }; - } - Err(err) => { - warn!( - "can not deserialize display brightness. {:?}", - err - ); - } - } - } - Err(err) => { - warn!( - "can not decode display brightness message. {:?}", - err - ); - } - }; - } - BOARD_SEND_CMD => { - let payload_text = String::from_utf8( - notification.payload.as_bytes().to_owned(), - ); - match payload_text { - Ok(payload_text) => { - let message: Result = - serde_json::from_str(payload_text.as_str()); - match message { - Ok(message) => { - match message_tx_cloned.send(message) { - Ok(_) => {} - Err(err) => { - warn!( - "can not broadcast mq message. {:?}", - err - ); - } - }; - } - Err(err) => { - warn!( - "can not deserialize mq message. {:?}", + while let Some(notification) = stream.next().await { + match notification { + Some(notification) => match notification.topic() { + DISPLAY_BRIGHTNESS_TOPIC => { + let payload_text = String::from_utf8(notification.payload().to_vec()); + match payload_text { + Ok(payload_text) => { + let display_brightness: Result = + serde_json::from_str(payload_text.as_str()); + match display_brightness { + Ok(display_brightness) => { + match change_display_brightness_tx2.send(display_brightness) + { + Ok(_) => {} + Err(err) => { + warn!( + "can not send display brightness to channel. {:?}", err ); - } } } - Err(err) => { - warn!("can not decode mq message message. {:?}", err); - } - }; - } - &_ => {} - }; - } else if let Incoming::ConnAck(connAck) = notification { - if connAck.code == ConnectReturnCode::Success { - match self.initialize().await { - Ok(_) => { - info!("resubscribe topics!"); } Err(err) => { - info!("resubscribe topics failed! {:?}", err); + warn!( + "can not parse display brightness from payload. {:?}", + err + ); } - }; + } + } + Err(err) => { + warn!("can not parse display brightness from payload. {:?}", err); } - } else if let Incoming::Disconnect = notification { - error!("MQTT Disconnected!"); } } - } - Err(err) => { - println!("MQTT Error Event = {:?}", err); + BOARD_SEND_CMD => { + let payload_text = String::from_utf8(notification.payload().to_vec()); + match payload_text { + Ok(payload_text) => { + let message: Result = + serde_json::from_str(payload_text.as_str()); + match message { + Ok(message) => match message_tx_cloned.send(message) { + Ok(_) => {} + Err(err) => { + warn!("can not send message to channel. {:?}", err); + } + }, + Err(err) => { + warn!("can not parse message from payload. {:?}", err); + } + } + } + Err(err) => { + warn!("can not parse message from payload. {:?}", err); + } + } + } + _ => {} + }, + _ => { + warn!("can not get notification from MQTT server."); } } } } pub async fn initialize(&self) -> anyhow::Result<()> { - self.subscribe_board().await?; - self.subscribe_display().await?; + // self.subscribe_board()?; + // self.subscribe_display()?; self.broadcast_desktop_online(); anyhow::Ok(()) } - async fn subscribe_board(&self) -> anyhow::Result<()> { + fn subscribe_board(&self) -> anyhow::Result<()> { self.client - .subscribe("display-ambient-light/board/#", QoS::AtMostOnce) - .await + .subscribe("display-ambient-light/board/#", mqtt::QOS_1) + .wait() .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) + .map(|_| ()) } - async fn subscribe_display(&self) -> anyhow::Result<()> { + fn subscribe_display(&self) -> anyhow::Result<()> { self.client - .subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce) - .await + .subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1) + .wait() .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) + .map(|_| ()) } fn broadcast_desktop_online(&self) { @@ -179,15 +189,12 @@ impl MqttRpc { .format(&format_description::well_known::Iso8601::DEFAULT) { Ok(now_str) => { - match client - .publish( - "display-ambient-light/desktop/online", - QoS::AtLeastOnce, - false, - now_str.as_bytes(), - ) - .await - { + let msg = mqtt::Message::new( + "display-ambient-light/desktop/online", + now_str.as_bytes(), + mqtt::QOS_0, + ); + match client.publish(msg).await { Ok(_) => {} Err(error) => { warn!("can not publish last online time. {}", error) @@ -205,12 +212,11 @@ impl MqttRpc { pub async fn publish_led_sub_pixels(&self, payload: Vec) -> anyhow::Result<()> { self.client - .publish( + .publish(mqtt::Message::new( "display-ambient-light/desktop/colors", - rumqttc::QoS::AtLeastOnce, - false, payload, - ) + mqtt::QOS_1, + )) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) } @@ -224,12 +230,11 @@ impl MqttRpc { let str = serde_json::to_string(&msg) .map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?; self.client - .publish( + .publish(mqtt::Message::new( DESKTOP_SEND_CMD, - rumqttc::QoS::AtLeastOnce, - false, str.as_bytes(), - ) + mqtt::QOS_1, + )) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) }