feat: 使用 ESP 事件循环传递消息。(有内存越界问题

This commit is contained in:
Ivan Li 2022-09-10 09:23:07 +08:00
parent 2a8e7db4b3
commit 25e2770d01
4 changed files with 169 additions and 51 deletions

View File

@ -11,7 +11,7 @@ use esp_idf_sys::c_types;
use log::{info, warn}; use log::{info, warn};
use serde_json::json; use serde_json::json;
use crate::{voltage_detection::VoltageDetectionWorker, VOLTAGE_EVENTLOOP}; use crate::{voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP}};
const WAITING_OFF_SECONDS: u8 = 60; const WAITING_OFF_SECONDS: u8 = 60;

View File

@ -1,11 +1,11 @@
use embedded_svc::event_bus::EventBus; use embedded_svc::event_bus::{EventBus, Postbox};
use esp_idf_svc::eventloop::{Background, EspBackgroundEventLoop, EspEventLoop}; use esp_idf_svc::eventloop::{Background, EspBackgroundEventLoop, EspEventLoop};
use esp_idf_sys::{self as _}; use esp_idf_sys::{self as _};
use log::*; use log::*;
use std::{ use std::{
env, env,
thread::{self, sleep}, thread::{self, sleep},
time::Duration, time::Duration, borrow::Borrow,
}; };
mod beep; mod beep;
@ -19,14 +19,13 @@ mod wifi;
use crate::{ use crate::{
beep::{ringtone, Beep}, beep::{ringtone, Beep},
dc_out_controller::{DcOutController, DcOutControllerState, DC_OUT_STATE_EVENT_LOOP}, dc_out_controller::{DcOutController, DcOutControllerState, DC_OUT_STATE_EVENT_LOOP},
voltage_detection::VoltageDetectionWorker, message_queue::{MqDto, MQ_EVENTLOOP},
voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP},
}; };
use crate::{ use crate::{
message_queue::MessageQueue, time::Time, voltage_detection::VoltageDetection, wifi::Internet, message_queue::MessageQueue, time::Time, voltage_detection::VoltageDetection, wifi::Internet,
}; };
static mut VOLTAGE_EVENTLOOP: Option<EspEventLoop<esp_idf_svc::eventloop::User<Background>>> = None;
fn main() { fn main() {
env::set_var("DEFMT_LOG", "trace"); env::set_var("DEFMT_LOG", "trace");
env::set_var("RUST_BACKTRACE", "1"); env::set_var("RUST_BACKTRACE", "1");
@ -73,26 +72,12 @@ fn main() {
let _wifi = Internet::new().unwrap(); let _wifi = Internet::new().unwrap();
let mut mq = MessageQueue::new();
mq.init().unwrap();
let mut time = Time::new(); let mut time = Time::new();
time.sync().unwrap(); time.sync().unwrap();
let mut beep = Beep::new(); sleep(Duration::from_millis(100));
// let _voltage_subscription; let mut beep = Beep::new();
// if let Some(eventloop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } {
// _voltage_subscription = eventloop
// .subscribe(move |message: &VoltageDetectionWorker| {
// if let Ok(json_str) = serde_json::to_string(&message) {
// if let Err(err) = mq.publish("voltage", json_str.as_bytes()) {
// warn!("Can not publish message to MQTT. {}", err);
// }
// }
// })
// .expect(" Listening Event Loop Failed");
// }
let mut dc_out_controller = let mut dc_out_controller =
DcOutController::get_instance().expect("Can not get DcOutController instance"); DcOutController::get_instance().expect("Can not get DcOutController instance");
@ -100,33 +85,93 @@ fn main() {
.watch() .watch()
.expect("Can not watch for dc_out_controller"); .expect("Can not watch for dc_out_controller");
let _dc_out_state_subscription; sleep(Duration::from_millis(100));
if let Some(eventloop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } {
_dc_out_state_subscription = eventloop let mut mq = MessageQueue::new();
.subscribe(move |message: &DcOutControllerState| { mq.init().unwrap();
info!("Event Loop Value");
match message.status { let _mq_subscription;
dc_out_controller::DcOutStatus::WaitingOff => { match mq.watch() {
beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.") Err(err) => {
error!("Can not watch MessageQueue. {}", err);
}
Ok(subscription) => _mq_subscription = subscription,
}
let _voltage_subscription;
if let Some(voltage_event_loop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } {
if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } {
_voltage_subscription = voltage_event_loop
.subscribe(move |message: &VoltageDetectionWorker| {
if let Ok(json_str) = serde_json::to_string(&message) {
let result = mq_event_loop.post(
&MqDto {
topic: "voltage",
message: json_str.as_bytes(),
},
None,
);
match result {
Ok(success) => {
if !success {
warn!("post voltage to mq event failed.")
}
}
Err(err) => warn!("post voltage to mq event failed. {}", err),
}
} }
dc_out_controller::DcOutStatus::WaitingOn(_) => { })
beep.play(ringtone::BATTERY_LOW).expect("Can not beep.") .expect(" Listening Event Loop Failed");
} } else {
dc_out_controller::DcOutStatus::TurningOff(_) => { panic!("MQ_EVENTLOOP is undefined!");
beep.play(ringtone::SHUTDOWN).expect("Can not beep.") }
}
_ => {beep.play(ringtone::SILENCE).expect("Can not beep.")}
}
if let Err(err) = mq.publish("dcOut", message.to_json().as_bytes()) {
warn!("Can not publish message to MQTT. {}", err);
}
})
.expect(" Listening Event Loop Failed");
} else { } else {
error!("DC_OUT_STATE_EVENT_LOOP is None"); panic!("VOLTAGE_EVENTLOOP is undefined!");
}
let _dc_out_state_subscription;
if let Some(dc_state_event_loop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } {
if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } {
_dc_out_state_subscription = dc_state_event_loop
.subscribe(move |message: &DcOutControllerState| {
info!("Event Loop Value");
match message.status {
dc_out_controller::DcOutStatus::WaitingOff => {
beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.")
}
dc_out_controller::DcOutStatus::WaitingOn(_) => {
beep.play(ringtone::BATTERY_LOW).expect("Can not beep.")
}
dc_out_controller::DcOutStatus::TurningOff(_) => {
beep.play(ringtone::SHUTDOWN).expect("Can not beep.")
}
_ => beep.play(ringtone::SILENCE).expect("Can not beep."),
}
let result = mq_event_loop.post(
&MqDto {
topic: "dc_out_state",
message: message.to_json().as_bytes(),
},
None,
);
match result {
Ok(success) => {
if !success {
warn!("post dc_out_state event failed.")
}
}
Err(err) => warn!("post dc_out_state event failed. {}", err),
}
})
.expect(" Listening Event Loop Failed");
} else {
panic!("MQ_EVENTLOOP is undefined!");
}
} else {
panic!("DC_OUT_STATE_EVENT_LOOP is undefined!");
} }
loop { loop {
sleep(Duration::from_millis(1000)); sleep(Duration::from_millis(100));
} }
} }

View File

@ -1,18 +1,35 @@
use std::thread; use std::thread;
use anyhow::Result; use anyhow::Result;
use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS}; use embedded_svc::{
use esp_idf_svc::mqtt::client::{EspMqttClient, MqttClientConfiguration}; event_bus::EventBus,
use esp_idf_sys::EspError; mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS},
};
use esp_idf_svc::{
eventloop::{
Background, EspBackgroundEventLoop, EspEventFetchData, EspEventLoop, EspEventPostData,
EspSubscription, EspTypedEventDeserializer, EspTypedEventSerializer, EspTypedEventSource,
User,
},
mqtt::client::{EspMqttClient, MqttClientConfiguration},
};
use esp_idf_sys::{c_types, EspError};
use log::*; use log::*;
use serde::{Deserialize, Serialize};
pub static mut MQ_EVENTLOOP: Option<EspEventLoop<esp_idf_svc::eventloop::User<Background>>> = None;
pub struct MessageQueue { pub struct MessageQueue {
pub client: Option<EspMqttClient<ConnState<MessageImpl, EspError>>>, pub client: Option<EspMqttClient<ConnState<MessageImpl, EspError>>>,
voltage_subscription: Option<EspSubscription<User<Background>>>,
} }
impl MessageQueue { impl MessageQueue {
pub fn new() -> MessageQueue { pub fn new() -> MessageQueue {
return MessageQueue { client: None }; return MessageQueue {
client: None,
voltage_subscription: None,
};
} }
pub fn init(&mut self) -> Result<()> { pub fn init(&mut self) -> Result<()> {
@ -73,4 +90,56 @@ impl MessageQueue {
) )
.map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err)) .map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err))
} }
pub fn watch(mut self) -> anyhow::Result<EspSubscription<User<Background>>> {
match EspBackgroundEventLoop::new(&Default::default()) {
Ok(eventloop) => unsafe { MQ_EVENTLOOP = Some(eventloop) },
Err(err) => anyhow::bail!("Init Event Loop failed. {:?}", err),
}
if let Some(eventloop) = unsafe { MQ_EVENTLOOP.as_mut() } {
let subscription = eventloop
.subscribe(move |dto: &MqDto| {
info!(
"publish data: {:?}",
unsafe {
String::from_utf8_unchecked(dto.message.to_vec())
}
);
if let Err(err) = self.publish(dto.topic, dto.message) {
warn!("Can not publish message to MQTT. {}", err);
}
})
.map_err(|err| anyhow::anyhow!(" Listening Event Loop Failed. {}", err))?;
// self.voltage_subscription = Some(subscription);
return anyhow::Ok(subscription);
}
anyhow::bail!("MQ_EVENTLOOP is Not Defined.")
}
}
#[derive(Copy, Debug, Clone, Serialize, Deserialize)]
pub struct MqDto<'a, 'b> {
pub topic: &'a str,
pub message: &'b [u8],
}
impl EspTypedEventSource for MqDto<'_, '_> {
fn source() -> *const c_types::c_char {
b"MqDto\0".as_ptr() as *const _
}
}
impl EspTypedEventSerializer<MqDto<'_, '_>> for MqDto<'_, '_> {
fn serialize<R>(event: &MqDto, f: impl for<'a> FnOnce(&'a EspEventPostData) -> R) -> R {
f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) })
}
}
impl<'b, 'c> EspTypedEventDeserializer<MqDto<'b, 'c>> for MqDto<'b, 'c> {
fn deserialize<R>(
data: &EspEventFetchData,
f: &mut impl for<'a> FnMut(&'a MqDto<'b, 'c>) -> R,
) -> R {
f(unsafe { data.as_payload() })
}
} }

View File

@ -18,7 +18,7 @@ use esp_idf_hal::{
use esp_idf_svc::{ use esp_idf_svc::{
eventloop::{ eventloop::{
EspEventFetchData, EspEventPostData, EspTypedEventDeserializer, EspTypedEventSerializer, EspEventFetchData, EspEventPostData, EspTypedEventDeserializer, EspTypedEventSerializer,
EspTypedEventSource, EspTypedEventSource, EspEventLoop, Background, EspBackgroundEventLoop,
}, },
timer::{EspTimer, EspTimerService}, timer::{EspTimer, EspTimerService},
}; };
@ -26,7 +26,7 @@ use esp_idf_sys::c_types;
use log::{info, warn}; use log::{info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::VOLTAGE_EVENTLOOP; pub static mut VOLTAGE_EVENTLOOP: Option<EspEventLoop<esp_idf_svc::eventloop::User<Background>>> = None;
const ADAPTER_OFFSET: f32 = 12002f32 / 900f32; const ADAPTER_OFFSET: f32 = 12002f32 / 900f32;
const BATTERY_OFFSET: f32 = 12002f32 / 900f32; const BATTERY_OFFSET: f32 = 12002f32 / 900f32;
@ -53,6 +53,10 @@ impl VoltageDetection {
} }
pub fn watching(&mut self) -> anyhow::Result<()> { pub fn watching(&mut self) -> anyhow::Result<()> {
match EspBackgroundEventLoop::new(&Default::default()) {
Ok(eventloop) => unsafe { VOLTAGE_EVENTLOOP = Some(eventloop) },
Err(err) => anyhow::bail!("Init Event Loop failed. {:?}", err),
}
let worker = Arc::new(Mutex::new(self.worker)); let worker = Arc::new(Mutex::new(self.worker));
let mut timer = EspTimerService::new()?.timer(move || { let mut timer = EspTimerService::new()?.timer(move || {
match worker.lock().as_mut() { match worker.lock().as_mut() {