feat: 使用线程处理 MQTT 发送。

This commit is contained in:
Ivan Li 2022-09-10 15:38:08 +08:00
parent 25e2770d01
commit eb96ce4afb
2 changed files with 101 additions and 141 deletions

View File

@ -3,9 +3,11 @@ use esp_idf_svc::eventloop::{Background, EspBackgroundEventLoop, EspEventLoop};
use esp_idf_sys::{self as _}; use esp_idf_sys::{self as _};
use log::*; use log::*;
use std::{ use std::{
borrow::Borrow,
env, env,
sync::{Arc, Mutex},
thread::{self, sleep}, thread::{self, sleep},
time::Duration, borrow::Borrow, time::Duration,
}; };
mod beep; mod beep;
@ -19,7 +21,7 @@ mod wifi;
use crate::{ use crate::{
beep::{ringtone, Beep}, beep::{ringtone, Beep},
dc_out_controller::{DcOutController, DcOutControllerState, DC_OUT_STATE_EVENT_LOOP}, dc_out_controller::{DcOutController, DcOutControllerState, DC_OUT_STATE_EVENT_LOOP},
message_queue::{MqDto, MQ_EVENTLOOP}, message_queue::MqDto,
voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP}, voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP},
}; };
use crate::{ use crate::{
@ -41,13 +43,6 @@ fn main() {
let peripherals = esp_idf_hal::peripherals::Peripherals::take().unwrap(); let peripherals = esp_idf_hal::peripherals::Peripherals::take().unwrap();
let blink_pin = peripherals.pins.gpio5; let blink_pin = peripherals.pins.gpio5;
let beep_pin = peripherals.pins.gpio6;
let ledc_timer0 = peripherals.ledc.timer0;
let ledc_channel0 = peripherals.ledc.channel0;
let dc_out_ctl_pin = peripherals.pins.gpio3;
let i2c0 = peripherals.i2c0;
let sda_pin = peripherals.pins.gpio4;
let scl_pin = peripherals.pins.gpio10;
info!("About to start a background event loop"); info!("About to start a background event loop");
match EspBackgroundEventLoop::new(&Default::default()) { match EspBackgroundEventLoop::new(&Default::default()) {
@ -87,86 +82,81 @@ fn main() {
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
let mut mq = MessageQueue::new(); let mut _mq = MessageQueue::new();
mq.init().unwrap();
let _mq_subscription; let _mq_subscription;
match mq.watch() { match _mq.watch() {
Err(err) => { Err(err) => {
error!("Can not watch MessageQueue. {}", err); error!("Can not watch MessageQueue. {}", err);
} }
Ok(subscription) => _mq_subscription = subscription, Ok(subscription) => _mq_subscription = subscription,
} }
let _mq_tx_for_voltage = _mq.tx.clone();
let _mq_tx_for_dc_out_state = _mq.tx.clone();
let _voltage_subscription; let _voltage_subscription;
if let Some(voltage_event_loop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } { if let Some(voltage_event_loop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } {
if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } { _voltage_subscription = voltage_event_loop
_voltage_subscription = voltage_event_loop .subscribe(move |message: &VoltageDetectionWorker| {
.subscribe(move |message: &VoltageDetectionWorker| { if let Ok(json_str) = serde_json::to_string(&message) {
if let Ok(json_str) = serde_json::to_string(&message) { match _mq_tx_for_voltage.lock() {
let result = mq_event_loop.post( Ok(tx) => {
&MqDto { let result = tx.send(MqDto {
topic: "voltage", topic: "voltage".to_string(),
message: json_str.as_bytes(), message: json_str,
}, });
None, match result {
); Ok(()) => {
match result { warn!("send voltage to mq message failed.")
Ok(success) => {
if !success {
warn!("post voltage to mq event failed.")
} }
Err(err) => warn!("send voltage to mq message failed. {}", err),
} }
Err(err) => warn!("post voltage to mq event failed. {}", err),
} }
Err(err) => warn!("send voltage to mq message failed. {}", err),
} }
}) }
.expect(" Listening Event Loop Failed"); })
} else { .expect(" Listening Event Loop Failed");
panic!("MQ_EVENTLOOP is undefined!");
}
} else { } else {
panic!("VOLTAGE_EVENTLOOP is undefined!"); panic!("VOLTAGE_EVENTLOOP is undefined!");
} }
let _dc_out_state_subscription; let _dc_out_state_subscription;
if let Some(dc_state_event_loop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } { if let Some(dc_state_event_loop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } {
if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } { _dc_out_state_subscription = dc_state_event_loop
_dc_out_state_subscription = dc_state_event_loop .subscribe(move |message: &DcOutControllerState| {
.subscribe(move |message: &DcOutControllerState| { info!("Event Loop Value");
info!("Event Loop Value"); match message.status {
match message.status { dc_out_controller::DcOutStatus::WaitingOff => {
dc_out_controller::DcOutStatus::WaitingOff => { beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.")
beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.")
}
dc_out_controller::DcOutStatus::WaitingOn(_) => {
beep.play(ringtone::BATTERY_LOW).expect("Can not beep.")
}
dc_out_controller::DcOutStatus::TurningOff(_) => {
beep.play(ringtone::SHUTDOWN).expect("Can not beep.")
}
_ => beep.play(ringtone::SILENCE).expect("Can not beep."),
} }
let result = mq_event_loop.post( dc_out_controller::DcOutStatus::WaitingOn(_) => {
&MqDto { beep.play(ringtone::BATTERY_LOW).expect("Can not beep.")
topic: "dc_out_state", }
message: message.to_json().as_bytes(), dc_out_controller::DcOutStatus::TurningOff(_) => {
}, beep.play(ringtone::SHUTDOWN).expect("Can not beep.")
None, }
); _ => beep.play(ringtone::SILENCE).expect("Can not beep."),
match result { }
Ok(success) => { match _mq_tx_for_dc_out_state.lock() {
if !success { Ok(tx) => {
warn!("post dc_out_state event failed.") let result = tx.send(MqDto {
topic: "dc_out_state".to_string(),
message: message.to_json(),
});
match result {
Ok(_) => {
info!("send dc_out_state message success.")
} }
Err(err) => warn!("send dc_out_state message failed. {}", err),
} }
Err(err) => warn!("post dc_out_state event failed. {}", err),
} }
}) Err(err) => warn!("send dc_out_state to mq message failed. {}", err),
.expect(" Listening Event Loop Failed"); }
} else { })
panic!("MQ_EVENTLOOP is undefined!"); .expect(" Listening Event Loop Failed");
}
} else { } else {
panic!("DC_OUT_STATE_EVENT_LOOP is undefined!"); panic!("DC_OUT_STATE_EVENT_LOOP is undefined!");
} }

View File

@ -1,38 +1,32 @@
use std::thread; use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread::{self, spawn},
time::Duration,
};
use anyhow::Result; use anyhow::Result;
use embedded_svc::{ use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS};
event_bus::EventBus,
mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS},
};
use esp_idf_svc::{ use esp_idf_svc::{
eventloop::{ eventloop::{Background, EspEventLoop},
Background, EspBackgroundEventLoop, EspEventFetchData, EspEventLoop, EspEventPostData,
EspSubscription, EspTypedEventDeserializer, EspTypedEventSerializer, EspTypedEventSource,
User,
},
mqtt::client::{EspMqttClient, MqttClientConfiguration}, mqtt::client::{EspMqttClient, MqttClientConfiguration},
}; };
use esp_idf_sys::{c_types, EspError}; use esp_idf_sys::EspError;
use log::*; use log::*;
use serde::{Deserialize, Serialize};
pub static mut MQ_EVENTLOOP: Option<EspEventLoop<esp_idf_svc::eventloop::User<Background>>> = None;
pub struct MessageQueue { pub struct MessageQueue {
pub client: Option<EspMqttClient<ConnState<MessageImpl, EspError>>>, pub tx: Arc<Mutex<Sender<MqDto>>>,
voltage_subscription: Option<EspSubscription<User<Background>>>, pub rx: Arc<Mutex<Receiver<MqDto>>>,
} }
impl MessageQueue { pub struct MessageQueueWatcher {
pub fn new() -> MessageQueue { pub client: EspMqttClient<ConnState<MessageImpl, EspError>>,
return MessageQueue { }
client: None,
voltage_subscription: None,
};
}
pub fn init(&mut self) -> Result<()> { impl MessageQueueWatcher {
pub fn new() -> Result<Self> {
let conf = MqttClientConfiguration { let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo"), client_id: Some("rust-esp32-std-demo"),
crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
@ -74,14 +68,11 @@ impl MessageQueue {
info!("Published a hello message to topic \"esp32-c3-rust-wifi-demo/ping\""); info!("Published a hello message to topic \"esp32-c3-rust-wifi-demo/ping\"");
self.client = Some(client); anyhow::Ok(Self { client })
anyhow::Ok(())
} }
pub fn publish(&mut self, topic: &str, bytes: &[u8]) -> Result<u32> { pub fn publish(&mut self, topic: &str, bytes: &[u8]) -> Result<u32> {
self.client self.client
.as_mut()
.unwrap()
.publish( .publish(
format!("{}/{}", "ups_0_2", topic).as_str(), format!("{}/{}", "ups_0_2", topic).as_str(),
QoS::AtMostOnce, QoS::AtMostOnce,
@ -90,56 +81,35 @@ impl MessageQueue {
) )
.map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err)) .map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err))
} }
}
pub fn watch(mut self) -> anyhow::Result<EspSubscription<User<Background>>> { impl MessageQueue {
match EspBackgroundEventLoop::new(&Default::default()) { pub fn new() -> Self {
Ok(eventloop) => unsafe { MQ_EVENTLOOP = Some(eventloop) }, let (tx, rx) = mpsc::channel();
Err(err) => anyhow::bail!("Init Event Loop failed. {:?}", err), return MessageQueue {
} tx: Arc::new(Mutex::new(tx)),
if let Some(eventloop) = unsafe { MQ_EVENTLOOP.as_mut() } { rx: Arc::new(Mutex::new(rx)),
let subscription = eventloop };
.subscribe(move |dto: &MqDto| { }
info!(
"publish data: {:?}", pub fn watch(&mut self) -> anyhow::Result<()> {
unsafe { let mut watcher = MessageQueueWatcher::new()
String::from_utf8_unchecked(dto.message.to_vec()) .map_err(|err| anyhow::anyhow!("Create MessageQueueWatcher failed. {}", err))?;
} let rx = self.rx.to_owned();
); // let (tx, rx) = mpsc::channel::<MqDto>();
if let Err(err) = self.publish(dto.topic, dto.message) { spawn(move || loop {
warn!("Can not publish message to MQTT. {}", err); if let Ok(dto) = rx.lock().unwrap().recv_timeout(Duration::from_secs(1)) {
} if let Err(err) = watcher.publish(dto.topic.as_str(), dto.message.as_bytes()) {
}) warn!("Can not publish message to MQTT. {}", err);
.map_err(|err| anyhow::anyhow!(" Listening Event Loop Failed. {}", err))?; }
// self.voltage_subscription = Some(subscription); }
return anyhow::Ok(subscription); });
} anyhow::Ok(())
anyhow::bail!("MQ_EVENTLOOP is Not Defined.")
} }
} }
#[derive(Copy, Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone)]
pub struct MqDto<'a, 'b> { pub struct MqDto {
pub topic: &'a str, pub message: String,
pub message: &'b [u8], pub topic: String,
}
impl EspTypedEventSource for MqDto<'_, '_> {
fn source() -> *const c_types::c_char {
b"MqDto\0".as_ptr() as *const _
}
}
impl EspTypedEventSerializer<MqDto<'_, '_>> for MqDto<'_, '_> {
fn serialize<R>(event: &MqDto, f: impl for<'a> FnOnce(&'a EspEventPostData) -> R) -> R {
f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) })
}
}
impl<'b, 'c> EspTypedEventDeserializer<MqDto<'b, 'c>> for MqDto<'b, 'c> {
fn deserialize<R>(
data: &EspEventFetchData,
f: &mut impl for<'a> FnMut(&'a MqDto<'b, 'c>) -> R,
) -> R {
f(unsafe { data.as_payload() })
}
} }