ups-esp32c3-rust/src/message_queue.rs

114 lines
3.4 KiB
Rust

use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread::{self, spawn},
time::Duration,
};
use anyhow::Result;
use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS};
use esp_idf_svc::mqtt::client::{EspMqttClient, MqttClientConfiguration};
use esp_idf_sys::EspError;
use log::*;
pub struct MessageQueue {
pub tx: Arc<Mutex<Sender<MqDto>>>,
pub rx: Arc<Mutex<Receiver<MqDto>>>,
}
pub struct MessageQueueWatcher {
pub client: EspMqttClient<ConnState<MessageImpl, EspError>>,
}
impl MessageQueueWatcher {
pub fn new() -> Result<Self> {
let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo"),
crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
};
let (mut client, mut connection) =
EspMqttClient::new_with_conn("mqtt://192.168.31.11:1883", &conf)?;
info!("MQTT client started");
thread::spawn(move || {
info!("MQTT Listening for messages");
while let Some(msg) = connection.next() {
match msg {
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => info!("MQTT Message: {:?}", msg),
}
}
info!("MQTT connection loop exit");
});
client
.subscribe("esp32-c3-rust-wifi-demo", QoS::AtMostOnce)
.map_err(|err| anyhow::anyhow!("subscribe message from queue failed. {}", err))?;
info!("Subscribed to all topics (esp32-c3-rust-wifi-demo)");
client
.publish(
"esp32-c3-rust-wifi-demo/ping",
QoS::AtMostOnce,
false,
"Hello".as_bytes(),
)
.map_err(|err| anyhow::anyhow!("publish message to queue failed. {}", err))?;
info!("Published a hello message to topic \"esp32-c3-rust-wifi-demo/ping\"");
anyhow::Ok(Self { client })
}
pub fn publish(&mut self, topic: &str, bytes: &[u8]) -> Result<u32> {
self.client
.publish(
format!("{}/{}", "ups_0_2", topic).as_str(),
QoS::AtMostOnce,
false,
bytes,
)
.map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err))
}
}
impl MessageQueue {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel();
return MessageQueue {
tx: Arc::new(Mutex::new(tx)),
rx: Arc::new(Mutex::new(rx)),
};
}
pub fn watch(&mut self) -> anyhow::Result<()> {
let mut watcher = MessageQueueWatcher::new()
.map_err(|err| anyhow::anyhow!("Create MessageQueueWatcher failed. {}", err))?;
let rx = self.rx.to_owned();
// let (tx, rx) = mpsc::channel::<MqDto>();
spawn(move || loop {
if let Ok(dto) = rx.lock().unwrap().recv_timeout(Duration::from_millis(400)) {
if let Err(err) = watcher.publish(dto.topic.as_str(), dto.message.as_bytes()) {
warn!("Can not publish message to MQTT. {}", err);
}
}
thread::sleep(Duration::from_millis(100))
});
anyhow::Ok(())
}
}
#[derive(Debug, Clone)]
pub struct MqDto {
pub message: String,
pub topic: String,
}