feat: MQTT client 改为 paho-mqtt. #5.

This commit is contained in:
Ivan Li 2023-02-19 16:14:47 +08:00
parent 6e65ef1a4d
commit 2f1453dd63
2 changed files with 144 additions and 137 deletions

View File

@ -1,7 +1,7 @@
use paris::error; use paris::error;
use tokio::sync::{broadcast, OnceCell}; use tokio::sync::{broadcast, OnceCell};
use crate::{display, picker::led_color::LedColor, models}; use crate::{display, models, picker::led_color::LedColor};
use super::mqtt::MqttRpc; use super::mqtt::MqttRpc;
@ -14,11 +14,13 @@ impl Manager {
pub async fn global() -> &'static Self { pub async fn global() -> &'static Self {
static RPC_MANAGER: OnceCell<Manager> = OnceCell::const_new(); static RPC_MANAGER: OnceCell<Manager> = OnceCell::const_new();
RPC_MANAGER.get_or_init(|| Manager::new()).await RPC_MANAGER
.get_or_init(|| async { Manager::new().await.unwrap() })
.await
} }
pub async fn new() -> Self { pub async fn new() -> anyhow::Result<Self> {
let mut mqtt = MqttRpc::new(); let mqtt = MqttRpc::new().await?;
let initialized = match mqtt.initialize().await { let initialized = match mqtt.initialize().await {
Ok(_) => true, Ok(_) => true,
Err(err) => { Err(err) => {
@ -26,10 +28,10 @@ impl Manager {
false false
} }
}; };
Self { Ok(Self {
client: mqtt, client: mqtt,
initialized, initialized,
} })
} }
pub async fn listen(&self) { pub async fn listen(&self) {

View File

@ -1,85 +1,114 @@
use crate::{display, models}; use crate::{display, models};
use image::EncodableLayout; use futures::StreamExt;
use paris::{warn, info, error}; use paho_mqtt as mqtt;
use rumqttc::{ use paris::{error, info, warn};
AsyncClient, ConnectReturnCode, Event, EventLoop, Incoming, MqttOptions, Outgoing, QoS, use serde_json::json;
};
use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration}; use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration};
use tauri::async_runtime::{Mutex, TokioJoinHandle}; use tauri::async_runtime::{Mutex, TokioJoinHandle};
use time::{format_description, OffsetDateTime}; use time::{format_description, OffsetDateTime};
use tokio::{sync::broadcast, task, time::sleep}; use tokio::{sync::broadcast, task, time::sleep};
const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display";
const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop";
const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness";
const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd"; const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd";
const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd"; const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd";
pub struct MqttRpc { pub struct MqttRpc {
client: AsyncClient, client: mqtt::AsyncClient,
change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>, change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>,
message_tx: broadcast::Sender<models::MqMessage>, message_tx: broadcast::Sender<models::MqMessage>,
eventloop: Arc<Mutex<EventLoop>>,
} }
impl MqttRpc { impl MqttRpc {
pub fn new() -> Self { pub async fn new() -> anyhow::Result<Self> {
let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883); let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883")
options.set_keep_alive(Duration::from_secs(5)); .map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?;
options.set_clean_session(false);
let (client, mut eventloop) = AsyncClient::new(options, 10); client.set_connected_callback(|client| {
info!("MQTT server connected.");
client.subscribe("display-ambient-light/board/#", mqtt::QOS_1);
client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1);
});
client.set_connection_lost_callback(|client| {
info!("MQTT server connection lost.");
});
client.set_disconnected_callback(|_, a1, a2| {
info!("MQTT server disconnected. {:?} {:?}", a1, a2);
});
let mut last_will_payload = serde_json::Map::new();
last_will_payload.insert("message".to_string(), json!("offline"));
last_will_payload.insert(
"time".to_string(),
serde_json::Value::String(
OffsetDateTime::now_utc()
.format(
&format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second]")
.unwrap(),
)
.unwrap()
.to_string(),
),
);
let last_will = mqtt::Message::new(
format!("{}/status", DESKTOP_TOPIC),
serde_json::to_string(&last_will_payload)
.unwrap()
.as_bytes(),
mqtt::QOS_1,
);
let connect_options = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(5))
.automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5))
.finalize();
let token = client.connect(connect_options);
token.await.map_err(|err| {
anyhow::anyhow!(
"can not connect MQTT server. wait for connect token failed. {:?}",
err
)
})?;
let (change_display_brightness_tx, _) = let (change_display_brightness_tx, _) =
broadcast::channel::<display::DisplayBrightness>(16); broadcast::channel::<display::DisplayBrightness>(16);
let (message_tx, _) = broadcast::channel::<models::MqMessage>(32); let (message_tx, _) = broadcast::channel::<models::MqMessage>(32);
Ok(Self {
Self {
client, client,
change_display_brightness_tx, change_display_brightness_tx,
message_tx, message_tx,
eventloop: Arc::new(Mutex::new(eventloop)), })
}
} }
pub async fn listen(&self) { pub async fn listen(&self) {
let change_display_brightness_tx2 = self.change_display_brightness_tx.clone(); let change_display_brightness_tx2 = self.change_display_brightness_tx.clone();
let message_tx_cloned = self.message_tx.clone(); let message_tx_cloned = self.message_tx.clone();
let mut eventloop = self.eventloop.clone().lock_owned().await; let mut stream = self.client.to_owned().get_stream(100);
loop { while let Some(notification) = stream.next().await {
match eventloop.poll().await { match notification {
Ok(notification) => { Some(notification) => match notification.topic() {
if let Event::Incoming(notification) = notification {
if let Incoming::Publish(notification) = notification {
match notification.topic.as_str() {
DISPLAY_BRIGHTNESS_TOPIC => { DISPLAY_BRIGHTNESS_TOPIC => {
let payload_text = String::from_utf8( let payload_text = String::from_utf8(notification.payload().to_vec());
notification.payload.as_bytes().to_owned(),
);
match payload_text { match payload_text {
Ok(payload_text) => { Ok(payload_text) => {
let display_brightness: Result< let display_brightness: Result<display::DisplayBrightness, _> =
display::DisplayBrightness, serde_json::from_str(payload_text.as_str());
_,
> = serde_json::from_str(payload_text.as_str());
match display_brightness { match display_brightness {
Ok(display_brightness) => { Ok(display_brightness) => {
match change_display_brightness_tx2 match change_display_brightness_tx2.send(display_brightness)
.send(display_brightness)
{ {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(err) => {
warn!( warn!(
"can not broadcast display brightness. {:?}", "can not send display brightness to channel. {:?}",
err
);
}
};
}
Err(err) => {
warn!(
"can not deserialize display brightness. {:?}",
err err
); );
} }
@ -87,88 +116,69 @@ impl MqttRpc {
} }
Err(err) => { Err(err) => {
warn!( warn!(
"can not decode display brightness message. {:?}", "can not parse display brightness from payload. {:?}",
err err
); );
} }
}; }
}
Err(err) => {
warn!("can not parse display brightness from payload. {:?}", err);
}
}
} }
BOARD_SEND_CMD => { BOARD_SEND_CMD => {
let payload_text = String::from_utf8( let payload_text = String::from_utf8(notification.payload().to_vec());
notification.payload.as_bytes().to_owned(),
);
match payload_text { match payload_text {
Ok(payload_text) => { Ok(payload_text) => {
let message: Result<models::MqMessage, _> = let message: Result<models::MqMessage, _> =
serde_json::from_str(payload_text.as_str()); serde_json::from_str(payload_text.as_str());
match message { match message {
Ok(message) => { Ok(message) => match message_tx_cloned.send(message) {
match message_tx_cloned.send(message) {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(err) => {
warn!( warn!("can not send message to channel. {:?}", err);
"can not broadcast mq message. {:?}",
err
);
}
};
} }
},
Err(err) => { Err(err) => {
warn!( warn!("can not parse message from payload. {:?}", err);
"can not deserialize mq message. {:?}",
err
);
} }
} }
} }
Err(err) => { Err(err) => {
warn!("can not decode mq message message. {:?}", err); warn!("can not parse message from payload. {:?}", err);
}
};
}
&_ => {}
};
} else if let Incoming::ConnAck(connAck) = notification {
if connAck.code == ConnectReturnCode::Success {
match self.initialize().await {
Ok(_) => {
info!("resubscribe topics!");
}
Err(err) => {
info!("resubscribe topics failed! {:?}", err);
}
};
}
} else if let Incoming::Disconnect = notification {
error!("MQTT Disconnected!");
} }
} }
} }
Err(err) => { _ => {}
println!("MQTT Error Event = {:?}", err); },
_ => {
warn!("can not get notification from MQTT server.");
} }
} }
} }
} }
pub async fn initialize(&self) -> anyhow::Result<()> { pub async fn initialize(&self) -> anyhow::Result<()> {
self.subscribe_board().await?; // self.subscribe_board()?;
self.subscribe_display().await?; // self.subscribe_display()?;
self.broadcast_desktop_online(); self.broadcast_desktop_online();
anyhow::Ok(()) anyhow::Ok(())
} }
async fn subscribe_board(&self) -> anyhow::Result<()> { fn subscribe_board(&self) -> anyhow::Result<()> {
self.client self.client
.subscribe("display-ambient-light/board/#", QoS::AtMostOnce) .subscribe("display-ambient-light/board/#", mqtt::QOS_1)
.await .wait()
.map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err))
.map(|_| ())
} }
async fn subscribe_display(&self) -> anyhow::Result<()> { fn subscribe_display(&self) -> anyhow::Result<()> {
self.client self.client
.subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce) .subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1)
.await .wait()
.map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err))
.map(|_| ())
} }
fn broadcast_desktop_online(&self) { fn broadcast_desktop_online(&self) {
@ -179,15 +189,12 @@ impl MqttRpc {
.format(&format_description::well_known::Iso8601::DEFAULT) .format(&format_description::well_known::Iso8601::DEFAULT)
{ {
Ok(now_str) => { Ok(now_str) => {
match client let msg = mqtt::Message::new(
.publish(
"display-ambient-light/desktop/online", "display-ambient-light/desktop/online",
QoS::AtLeastOnce,
false,
now_str.as_bytes(), now_str.as_bytes(),
) mqtt::QOS_0,
.await );
{ match client.publish(msg).await {
Ok(_) => {} Ok(_) => {}
Err(error) => { Err(error) => {
warn!("can not publish last online time. {}", error) warn!("can not publish last online time. {}", error)
@ -205,12 +212,11 @@ impl MqttRpc {
pub async fn publish_led_sub_pixels(&self, payload: Vec<u8>) -> anyhow::Result<()> { pub async fn publish_led_sub_pixels(&self, payload: Vec<u8>) -> anyhow::Result<()> {
self.client self.client
.publish( .publish(mqtt::Message::new(
"display-ambient-light/desktop/colors", "display-ambient-light/desktop/colors",
rumqttc::QoS::AtLeastOnce,
false,
payload, payload,
) mqtt::QOS_1,
))
.await .await
.map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
} }
@ -224,12 +230,11 @@ impl MqttRpc {
let str = serde_json::to_string(&msg) let str = serde_json::to_string(&msg)
.map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?; .map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?;
self.client self.client
.publish( .publish(mqtt::Message::new(
DESKTOP_SEND_CMD, DESKTOP_SEND_CMD,
rumqttc::QoS::AtLeastOnce,
false,
str.as_bytes(), str.as_bytes(),
) mqtt::QOS_1,
))
.await .await
.map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
} }