use crate::display; use image::EncodableLayout; use paris::warn; use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; use std::time::Duration; use time::{format_description, OffsetDateTime}; use tokio::{sync::broadcast, task}; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; pub struct MqttRpc { client: AsyncClient, change_display_brightness_tx: broadcast::Sender, } impl MqttRpc { pub fn new() -> Self { let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883); options.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(options, 10); let (change_display_brightness_tx, _) = broadcast::channel::(16); let change_display_brightness_tx2 = change_display_brightness_tx.clone(); task::spawn(async move { loop { match eventloop.poll().await { Ok(notification) => { let handled = || -> anyhow::Result<()> { println!("MQTT notification = {:?}", notification); if let Event::Incoming(notification) = notification { if let Incoming::Publish(notification) = notification { match notification.topic.as_str() { DISPLAY_BRIGHTNESS_TOPIC => { let payload_text = String::from_utf8( notification.payload.as_bytes().to_owned(), ) .map_err(|err| { anyhow::anyhow!("can not parse json. {:?}", err) })?; let display_brightness: display::DisplayBrightness = serde_json::from_str(payload_text.as_str()) .map_err(|err| { anyhow::anyhow!( "can not deserialize display brightness. {:?}", err ) })?; change_display_brightness_tx2 .send(display_brightness) .map_err(|err| { anyhow::anyhow!( "can not broadcast display brightness. {:?}", err ) })?; } &_ => {} }; } } Ok(()) }; if let Err(err) = handled() { warn!("handle notification was failed. Error: {:?}", err); } } Err(err) => { println!("MQTT Error Event = {:?}", err); } } } }); Self { client, change_display_brightness_tx, } } pub async fn initialize(&mut self) -> anyhow::Result<()> { self.subscribe_board().await?; self.subscribe_display().await?; self.broadcast_desktop_online(); anyhow::Ok(()) } async fn subscribe_board(&self) -> anyhow::Result<()> { self.client .subscribe("display-ambient-light/board/#", QoS::AtMostOnce) .await .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) } async fn subscribe_display(&self) -> anyhow::Result<()> { self.client .subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce) .await .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) } fn broadcast_desktop_online(&mut self) { let client = self.client.to_owned(); task::spawn(async move { loop { match OffsetDateTime::now_utc() .format(&format_description::well_known::Iso8601::DEFAULT) { Ok(now_str) => { match client .publish( "display-ambient-light/desktop/online", QoS::AtLeastOnce, false, now_str.as_bytes(), ) .await { Ok(_) => {} Err(error) => { warn!("can not publish last online time. {}", error) } } } Err(error) => { warn!("can not get time for now. {}", error); } } tokio::time::sleep(Duration::from_millis(1000)).await; } }); } pub async fn publish_led_sub_pixels(&self, payload: Vec) -> anyhow::Result<()> { self.client .publish( "display-ambient-light/desktop/colors", rumqttc::QoS::AtLeastOnce, false, payload, ) .await .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) } pub fn subscribe_change_display_brightness_rx(&self) -> broadcast::Receiver { self.change_display_brightness_tx.subscribe() } }