From 2f1453dd63715d1cdd7d0c2db50c5d5ddebb638a Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 19 Feb 2023 16:14:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20MQTT=20client=20=E6=94=B9=E4=B8=BA=20pa?= =?UTF-8?q?ho-mqtt.=20#5.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/src/rpc/manager.rs | 14 +- src-tauri/src/rpc/mqtt.rs | 267 ++++++++++++++++++----------------- 2 files changed, 144 insertions(+), 137 deletions(-) diff --git a/src-tauri/src/rpc/manager.rs b/src-tauri/src/rpc/manager.rs index 9e7aea9..ad2317e 100644 --- a/src-tauri/src/rpc/manager.rs +++ b/src-tauri/src/rpc/manager.rs @@ -1,7 +1,7 @@ use paris::error; use tokio::sync::{broadcast, OnceCell}; -use crate::{display, picker::led_color::LedColor, models}; +use crate::{display, models, picker::led_color::LedColor}; use super::mqtt::MqttRpc; @@ -14,11 +14,13 @@ impl Manager { pub async fn global() -> &'static Self { static RPC_MANAGER: OnceCell = OnceCell::const_new(); - RPC_MANAGER.get_or_init(|| Manager::new()).await + RPC_MANAGER + .get_or_init(|| async { Manager::new().await.unwrap() }) + .await } - pub async fn new() -> Self { - let mut mqtt = MqttRpc::new(); + pub async fn new() -> anyhow::Result { + let mqtt = MqttRpc::new().await?; let initialized = match mqtt.initialize().await { Ok(_) => true, Err(err) => { @@ -26,10 +28,10 @@ impl Manager { false } }; - Self { + Ok(Self { client: mqtt, initialized, - } + }) } pub async fn listen(&self) { diff --git a/src-tauri/src/rpc/mqtt.rs b/src-tauri/src/rpc/mqtt.rs index bd56ecb..02bf836 100644 --- a/src-tauri/src/rpc/mqtt.rs +++ b/src-tauri/src/rpc/mqtt.rs @@ -1,174 +1,184 @@ use crate::{display, models}; -use image::EncodableLayout; -use paris::{warn, info, error}; -use rumqttc::{ - AsyncClient, ConnectReturnCode, Event, EventLoop, Incoming, MqttOptions, Outgoing, QoS, -}; +use futures::StreamExt; +use paho_mqtt as mqtt; +use paris::{error, info, warn}; +use serde_json::json; use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration}; use tauri::async_runtime::{Mutex, TokioJoinHandle}; use time::{format_description, OffsetDateTime}; use tokio::{sync::broadcast, task, time::sleep}; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; +const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop"; const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd"; const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd"; pub struct MqttRpc { - client: AsyncClient, + client: mqtt::AsyncClient, change_display_brightness_tx: broadcast::Sender, message_tx: broadcast::Sender, - eventloop: Arc>, } impl MqttRpc { - pub fn new() -> Self { - let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883); - options.set_keep_alive(Duration::from_secs(5)); - options.set_clean_session(false); + pub async fn new() -> anyhow::Result { + let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883") + .map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?; - let (client, mut eventloop) = AsyncClient::new(options, 10); + client.set_connected_callback(|client| { + info!("MQTT server connected."); + + client.subscribe("display-ambient-light/board/#", mqtt::QOS_1); + + client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1); + }); + client.set_connection_lost_callback(|client| { + info!("MQTT server connection lost."); + }); + client.set_disconnected_callback(|_, a1, a2| { + info!("MQTT server disconnected. {:?} {:?}", a1, a2); + }); + + let mut last_will_payload = serde_json::Map::new(); + last_will_payload.insert("message".to_string(), json!("offline")); + last_will_payload.insert( + "time".to_string(), + serde_json::Value::String( + OffsetDateTime::now_utc() + .format( + &format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second]") + .unwrap(), + ) + .unwrap() + .to_string(), + ), + ); + + let last_will = mqtt::Message::new( + format!("{}/status", DESKTOP_TOPIC), + serde_json::to_string(&last_will_payload) + .unwrap() + .as_bytes(), + mqtt::QOS_1, + ); + + let connect_options = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(5)) + .automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5)) + .finalize(); + + let token = client.connect(connect_options); + + token.await.map_err(|err| { + anyhow::anyhow!( + "can not connect MQTT server. wait for connect token failed. {:?}", + err + ) + })?; let (change_display_brightness_tx, _) = broadcast::channel::(16); let (message_tx, _) = broadcast::channel::(32); - - Self { + Ok(Self { client, change_display_brightness_tx, message_tx, - eventloop: Arc::new(Mutex::new(eventloop)), - } + }) } pub async fn listen(&self) { let change_display_brightness_tx2 = self.change_display_brightness_tx.clone(); let message_tx_cloned = self.message_tx.clone(); - let mut eventloop = self.eventloop.clone().lock_owned().await; + let mut stream = self.client.to_owned().get_stream(100); - loop { - match eventloop.poll().await { - Ok(notification) => { - if let Event::Incoming(notification) = notification { - if let Incoming::Publish(notification) = notification { - match notification.topic.as_str() { - DISPLAY_BRIGHTNESS_TOPIC => { - let payload_text = String::from_utf8( - notification.payload.as_bytes().to_owned(), - ); - match payload_text { - Ok(payload_text) => { - let display_brightness: Result< - display::DisplayBrightness, - _, - > = serde_json::from_str(payload_text.as_str()); - match display_brightness { - Ok(display_brightness) => { - match change_display_brightness_tx2 - .send(display_brightness) - { - Ok(_) => {} - Err(err) => { - warn!( - "can not broadcast display brightness. {:?}", - err - ); - } - }; - } - Err(err) => { - warn!( - "can not deserialize display brightness. {:?}", - err - ); - } - } - } - Err(err) => { - warn!( - "can not decode display brightness message. {:?}", - err - ); - } - }; - } - BOARD_SEND_CMD => { - let payload_text = String::from_utf8( - notification.payload.as_bytes().to_owned(), - ); - match payload_text { - Ok(payload_text) => { - let message: Result = - serde_json::from_str(payload_text.as_str()); - match message { - Ok(message) => { - match message_tx_cloned.send(message) { - Ok(_) => {} - Err(err) => { - warn!( - "can not broadcast mq message. {:?}", - err - ); - } - }; - } - Err(err) => { - warn!( - "can not deserialize mq message. {:?}", + while let Some(notification) = stream.next().await { + match notification { + Some(notification) => match notification.topic() { + DISPLAY_BRIGHTNESS_TOPIC => { + let payload_text = String::from_utf8(notification.payload().to_vec()); + match payload_text { + Ok(payload_text) => { + let display_brightness: Result = + serde_json::from_str(payload_text.as_str()); + match display_brightness { + Ok(display_brightness) => { + match change_display_brightness_tx2.send(display_brightness) + { + Ok(_) => {} + Err(err) => { + warn!( + "can not send display brightness to channel. {:?}", err ); - } } } - Err(err) => { - warn!("can not decode mq message message. {:?}", err); - } - }; - } - &_ => {} - }; - } else if let Incoming::ConnAck(connAck) = notification { - if connAck.code == ConnectReturnCode::Success { - match self.initialize().await { - Ok(_) => { - info!("resubscribe topics!"); } Err(err) => { - info!("resubscribe topics failed! {:?}", err); + warn!( + "can not parse display brightness from payload. {:?}", + err + ); } - }; + } + } + Err(err) => { + warn!("can not parse display brightness from payload. {:?}", err); } - } else if let Incoming::Disconnect = notification { - error!("MQTT Disconnected!"); } } - } - Err(err) => { - println!("MQTT Error Event = {:?}", err); + BOARD_SEND_CMD => { + let payload_text = String::from_utf8(notification.payload().to_vec()); + match payload_text { + Ok(payload_text) => { + let message: Result = + serde_json::from_str(payload_text.as_str()); + match message { + Ok(message) => match message_tx_cloned.send(message) { + Ok(_) => {} + Err(err) => { + warn!("can not send message to channel. {:?}", err); + } + }, + Err(err) => { + warn!("can not parse message from payload. {:?}", err); + } + } + } + Err(err) => { + warn!("can not parse message from payload. {:?}", err); + } + } + } + _ => {} + }, + _ => { + warn!("can not get notification from MQTT server."); } } } } pub async fn initialize(&self) -> anyhow::Result<()> { - self.subscribe_board().await?; - self.subscribe_display().await?; + // self.subscribe_board()?; + // self.subscribe_display()?; self.broadcast_desktop_online(); anyhow::Ok(()) } - async fn subscribe_board(&self) -> anyhow::Result<()> { + fn subscribe_board(&self) -> anyhow::Result<()> { self.client - .subscribe("display-ambient-light/board/#", QoS::AtMostOnce) - .await + .subscribe("display-ambient-light/board/#", mqtt::QOS_1) + .wait() .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) + .map(|_| ()) } - async fn subscribe_display(&self) -> anyhow::Result<()> { + fn subscribe_display(&self) -> anyhow::Result<()> { self.client - .subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce) - .await + .subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1) + .wait() .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) + .map(|_| ()) } fn broadcast_desktop_online(&self) { @@ -179,15 +189,12 @@ impl MqttRpc { .format(&format_description::well_known::Iso8601::DEFAULT) { Ok(now_str) => { - match client - .publish( - "display-ambient-light/desktop/online", - QoS::AtLeastOnce, - false, - now_str.as_bytes(), - ) - .await - { + let msg = mqtt::Message::new( + "display-ambient-light/desktop/online", + now_str.as_bytes(), + mqtt::QOS_0, + ); + match client.publish(msg).await { Ok(_) => {} Err(error) => { warn!("can not publish last online time. {}", error) @@ -205,12 +212,11 @@ impl MqttRpc { pub async fn publish_led_sub_pixels(&self, payload: Vec) -> anyhow::Result<()> { self.client - .publish( + .publish(mqtt::Message::new( "display-ambient-light/desktop/colors", - rumqttc::QoS::AtLeastOnce, - false, payload, - ) + mqtt::QOS_1, + )) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) } @@ -224,12 +230,11 @@ impl MqttRpc { let str = serde_json::to_string(&msg) .map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?; self.client - .publish( + .publish(mqtt::Message::new( DESKTOP_SEND_CMD, - rumqttc::QoS::AtLeastOnce, - false, str.as_bytes(), - ) + mqtt::QOS_1, + )) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) }