114 lines
3.4 KiB
Rust
114 lines
3.4 KiB
Rust
use std::{
|
|
sync::{
|
|
mpsc::{self, Receiver, Sender},
|
|
Arc, Mutex,
|
|
},
|
|
thread::{self, spawn},
|
|
time::Duration,
|
|
};
|
|
|
|
use anyhow::Result;
|
|
use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS};
|
|
use esp_idf_svc::mqtt::client::{EspMqttClient, MqttClientConfiguration};
|
|
use esp_idf_sys::EspError;
|
|
use log::*;
|
|
|
|
pub struct MessageQueue {
|
|
pub tx: Arc<Mutex<Sender<MqDto>>>,
|
|
pub rx: Arc<Mutex<Receiver<MqDto>>>,
|
|
}
|
|
|
|
pub struct MessageQueueWatcher {
|
|
pub client: EspMqttClient<ConnState<MessageImpl, EspError>>,
|
|
}
|
|
|
|
impl MessageQueueWatcher {
|
|
pub fn new() -> Result<Self> {
|
|
let conf = MqttClientConfiguration {
|
|
client_id: Some("rust-esp32-std-demo"),
|
|
crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
|
|
|
|
..Default::default()
|
|
};
|
|
|
|
let (mut client, mut connection) =
|
|
EspMqttClient::new_with_conn("mqtt://192.168.31.11:1883", &conf)?;
|
|
|
|
info!("MQTT client started");
|
|
thread::spawn(move || {
|
|
info!("MQTT Listening for messages");
|
|
|
|
while let Some(msg) = connection.next() {
|
|
match msg {
|
|
Err(e) => info!("MQTT Message ERROR: {}", e),
|
|
Ok(msg) => info!("MQTT Message: {:?}", msg),
|
|
}
|
|
}
|
|
|
|
info!("MQTT connection loop exit");
|
|
});
|
|
|
|
client
|
|
.subscribe("esp32-c3-rust-wifi-demo", QoS::AtMostOnce)
|
|
.map_err(|err| anyhow::anyhow!("subscribe message from queue failed. {}", err))?;
|
|
|
|
info!("Subscribed to all topics (esp32-c3-rust-wifi-demo)");
|
|
|
|
client
|
|
.publish(
|
|
"esp32-c3-rust-wifi-demo/ping",
|
|
QoS::AtMostOnce,
|
|
false,
|
|
"Hello".as_bytes(),
|
|
)
|
|
.map_err(|err| anyhow::anyhow!("publish message to queue failed. {}", err))?;
|
|
|
|
info!("Published a hello message to topic \"esp32-c3-rust-wifi-demo/ping\"");
|
|
|
|
anyhow::Ok(Self { client })
|
|
}
|
|
|
|
pub fn publish(&mut self, topic: &str, bytes: &[u8]) -> Result<u32> {
|
|
self.client
|
|
.publish(
|
|
format!("{}/{}", "ups_0_2", topic).as_str(),
|
|
QoS::AtMostOnce,
|
|
false,
|
|
bytes,
|
|
)
|
|
.map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err))
|
|
}
|
|
}
|
|
|
|
impl MessageQueue {
|
|
pub fn new() -> Self {
|
|
let (tx, rx) = mpsc::channel();
|
|
return MessageQueue {
|
|
tx: Arc::new(Mutex::new(tx)),
|
|
rx: Arc::new(Mutex::new(rx)),
|
|
};
|
|
}
|
|
|
|
pub fn watch(&mut self) -> anyhow::Result<()> {
|
|
let mut watcher = MessageQueueWatcher::new()
|
|
.map_err(|err| anyhow::anyhow!("Create MessageQueueWatcher failed. {}", err))?;
|
|
let rx = self.rx.to_owned();
|
|
// let (tx, rx) = mpsc::channel::<MqDto>();
|
|
spawn(move || loop {
|
|
if let Ok(dto) = rx.lock().unwrap().recv_timeout(Duration::from_millis(400)) {
|
|
if let Err(err) = watcher.publish(dto.topic.as_str(), dto.message.as_bytes()) {
|
|
warn!("Can not publish message to MQTT. {}", err);
|
|
}
|
|
}
|
|
thread::sleep(Duration::from_millis(100))
|
|
});
|
|
anyhow::Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct MqDto {
|
|
pub message: String,
|
|
pub topic: String,
|
|
}
|