use crate::{display, models}; use image::EncodableLayout; use paris::{warn, info, error}; use rumqttc::{ AsyncClient, ConnectReturnCode, Event, EventLoop, Incoming, MqttOptions, Outgoing, QoS, }; use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration}; use tauri::async_runtime::{Mutex, TokioJoinHandle}; use time::{format_description, OffsetDateTime}; use tokio::{sync::broadcast, task, time::sleep}; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd"; const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd"; pub struct MqttRpc { client: AsyncClient, change_display_brightness_tx: broadcast::Sender, message_tx: broadcast::Sender, eventloop: Arc>, } impl MqttRpc { pub fn new() -> Self { let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883); options.set_keep_alive(Duration::from_secs(5)); options.set_clean_session(false); let (client, mut eventloop) = AsyncClient::new(options, 10); let (change_display_brightness_tx, _) = broadcast::channel::(16); let (message_tx, _) = broadcast::channel::(32); Self { client, change_display_brightness_tx, message_tx, eventloop: Arc::new(Mutex::new(eventloop)), } } pub async fn listen(&self) { let change_display_brightness_tx2 = self.change_display_brightness_tx.clone(); let message_tx_cloned = self.message_tx.clone(); let mut eventloop = self.eventloop.clone().lock_owned().await; loop { match eventloop.poll().await { Ok(notification) => { if let Event::Incoming(notification) = notification { if let Incoming::Publish(notification) = notification { match notification.topic.as_str() { DISPLAY_BRIGHTNESS_TOPIC => { let payload_text = String::from_utf8( notification.payload.as_bytes().to_owned(), ); match payload_text { Ok(payload_text) => { let display_brightness: Result< display::DisplayBrightness, _, > = serde_json::from_str(payload_text.as_str()); match display_brightness { Ok(display_brightness) => { match change_display_brightness_tx2 .send(display_brightness) { Ok(_) => {} Err(err) => { warn!( "can not broadcast display brightness. {:?}", err ); } }; } Err(err) => { warn!( "can not deserialize display brightness. {:?}", err ); } } } Err(err) => { warn!( "can not decode display brightness message. {:?}", err ); } }; } BOARD_SEND_CMD => { let payload_text = String::from_utf8( notification.payload.as_bytes().to_owned(), ); match payload_text { Ok(payload_text) => { let message: Result = serde_json::from_str(payload_text.as_str()); match message { Ok(message) => { match message_tx_cloned.send(message) { Ok(_) => {} Err(err) => { warn!( "can not broadcast mq message. {:?}", err ); } }; } Err(err) => { warn!( "can not deserialize mq message. {:?}", err ); } } } Err(err) => { warn!("can not decode mq message message. {:?}", err); } }; } &_ => {} }; } else if let Incoming::ConnAck(connAck) = notification { if connAck.code == ConnectReturnCode::Success { match self.initialize().await { Ok(_) => { info!("resubscribe topics!"); } Err(err) => { info!("resubscribe topics failed! {:?}", err); } }; } } else if let Incoming::Disconnect = notification { error!("MQTT Disconnected!"); } } } Err(err) => { println!("MQTT Error Event = {:?}", err); } } } } pub async fn initialize(&self) -> anyhow::Result<()> { self.subscribe_board().await?; self.subscribe_display().await?; self.broadcast_desktop_online(); anyhow::Ok(()) } async fn subscribe_board(&self) -> anyhow::Result<()> { self.client .subscribe("display-ambient-light/board/#", QoS::AtMostOnce) .await .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) } async fn subscribe_display(&self) -> anyhow::Result<()> { self.client .subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce) .await .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) } fn broadcast_desktop_online(&self) { let client = self.client.to_owned(); task::spawn(async move { loop { match OffsetDateTime::now_utc() .format(&format_description::well_known::Iso8601::DEFAULT) { Ok(now_str) => { match client .publish( "display-ambient-light/desktop/online", QoS::AtLeastOnce, false, now_str.as_bytes(), ) .await { Ok(_) => {} Err(error) => { warn!("can not publish last online time. {}", error) } } } Err(error) => { warn!("can not get time for now. {}", error); } } tokio::time::sleep(Duration::from_millis(1000)).await; } }); } pub async fn publish_led_sub_pixels(&self, payload: Vec) -> anyhow::Result<()> { self.client .publish( "display-ambient-light/desktop/colors", rumqttc::QoS::AtLeastOnce, false, payload, ) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) } pub fn subscribe_change_display_brightness_rx( &self, ) -> broadcast::Receiver { self.change_display_brightness_tx.subscribe() } pub async fn publish_desktop_cmd(&self, msg: models::MqMessage) -> anyhow::Result<()> { let str = serde_json::to_string(&msg) .map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?; self.client .publish( DESKTOP_SEND_CMD, rumqttc::QoS::AtLeastOnce, false, str.as_bytes(), ) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) } }