use std::{ sync::{ mpsc::{self, Receiver, Sender}, Arc, Mutex, }, thread::{self, spawn}, time::Duration, }; use anyhow::Result; use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::{EspMqttClient, MqttClientConfiguration}; use esp_idf_sys::EspError; use log::*; pub struct MessageQueue { pub tx: Arc>>, pub rx: Arc>>, } pub struct MessageQueueWatcher { pub client: EspMqttClient>, } impl MessageQueueWatcher { pub fn new() -> Result { let conf = MqttClientConfiguration { client_id: Some("rust-esp32-std-demo"), crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), ..Default::default() }; let (mut client, mut connection) = EspMqttClient::new_with_conn("mqtt://192.168.31.11:1883", &conf)?; info!("MQTT client started"); thread::spawn(move || { info!("MQTT Listening for messages"); while let Some(msg) = connection.next() { match msg { Err(e) => info!("MQTT Message ERROR: {}", e), Ok(msg) => info!("MQTT Message: {:?}", msg), } } info!("MQTT connection loop exit"); }); client .subscribe("esp32-c3-rust-wifi-demo", QoS::AtMostOnce) .map_err(|err| anyhow::anyhow!("subscribe message from queue failed. {}", err))?; info!("Subscribed to all topics (esp32-c3-rust-wifi-demo)"); client .publish( "esp32-c3-rust-wifi-demo/ping", QoS::AtMostOnce, false, "Hello".as_bytes(), ) .map_err(|err| anyhow::anyhow!("publish message to queue failed. {}", err))?; info!("Published a hello message to topic \"esp32-c3-rust-wifi-demo/ping\""); anyhow::Ok(Self { client }) } pub fn publish(&mut self, topic: &str, bytes: &[u8]) -> Result { self.client .publish( format!("{}/{}", "ups_0_2", topic).as_str(), QoS::AtMostOnce, false, bytes, ) .map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err)) } } impl MessageQueue { pub fn new() -> Self { let (tx, rx) = mpsc::channel(); return MessageQueue { tx: Arc::new(Mutex::new(tx)), rx: Arc::new(Mutex::new(rx)), }; } pub fn watch(&mut self) -> anyhow::Result<()> { let mut watcher = MessageQueueWatcher::new() .map_err(|err| anyhow::anyhow!("Create MessageQueueWatcher failed. {}", err))?; let rx = self.rx.to_owned(); // let (tx, rx) = mpsc::channel::(); spawn(move || loop { if let Ok(dto) = rx.lock().unwrap().recv_timeout(Duration::from_millis(400)) { if let Err(err) = watcher.publish(dto.topic.as_str(), dto.message.as_bytes()) { warn!("Can not publish message to MQTT. {}", err); } } thread::sleep(Duration::from_millis(100)) }); anyhow::Ok(()) } } #[derive(Debug, Clone)] pub struct MqDto { pub message: String, pub topic: String, }