From 25e2770d0194ff29f543a168d5008877439e587a Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sat, 10 Sep 2022 09:23:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BD=BF=E7=94=A8=20ESP=20=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=BE=AA=E7=8E=AF=E4=BC=A0=E9=80=92=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E3=80=82=EF=BC=88=E6=9C=89=E5=86=85=E5=AD=98=E8=B6=8A=E7=95=8C?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/dc_out_controller.rs | 2 +- src/main.rs | 133 ++++++++++++++++++++++++++------------- src/message_queue.rs | 77 +++++++++++++++++++++-- src/voltage_detection.rs | 8 ++- 4 files changed, 169 insertions(+), 51 deletions(-) diff --git a/src/dc_out_controller.rs b/src/dc_out_controller.rs index ac7cb69..43e0a84 100644 --- a/src/dc_out_controller.rs +++ b/src/dc_out_controller.rs @@ -11,7 +11,7 @@ use esp_idf_sys::c_types; use log::{info, warn}; use serde_json::json; -use crate::{voltage_detection::VoltageDetectionWorker, VOLTAGE_EVENTLOOP}; +use crate::{voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP}}; const WAITING_OFF_SECONDS: u8 = 60; diff --git a/src/main.rs b/src/main.rs index b735c3f..c3309b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ -use embedded_svc::event_bus::EventBus; +use embedded_svc::event_bus::{EventBus, Postbox}; use esp_idf_svc::eventloop::{Background, EspBackgroundEventLoop, EspEventLoop}; use esp_idf_sys::{self as _}; use log::*; use std::{ env, thread::{self, sleep}, - time::Duration, + time::Duration, borrow::Borrow, }; mod beep; @@ -19,14 +19,13 @@ mod wifi; use crate::{ beep::{ringtone, Beep}, dc_out_controller::{DcOutController, DcOutControllerState, DC_OUT_STATE_EVENT_LOOP}, - voltage_detection::VoltageDetectionWorker, + message_queue::{MqDto, MQ_EVENTLOOP}, + voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP}, }; use crate::{ message_queue::MessageQueue, time::Time, voltage_detection::VoltageDetection, wifi::Internet, }; -static mut VOLTAGE_EVENTLOOP: Option>> = None; - fn main() { env::set_var("DEFMT_LOG", "trace"); env::set_var("RUST_BACKTRACE", "1"); @@ -73,26 +72,12 @@ fn main() { let _wifi = Internet::new().unwrap(); - let mut mq = MessageQueue::new(); - mq.init().unwrap(); - let mut time = Time::new(); time.sync().unwrap(); - let mut beep = Beep::new(); + sleep(Duration::from_millis(100)); - // let _voltage_subscription; - // if let Some(eventloop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } { - // _voltage_subscription = eventloop - // .subscribe(move |message: &VoltageDetectionWorker| { - // if let Ok(json_str) = serde_json::to_string(&message) { - // if let Err(err) = mq.publish("voltage", json_str.as_bytes()) { - // warn!("Can not publish message to MQTT. {}", err); - // } - // } - // }) - // .expect(" Listening Event Loop Failed"); - // } + let mut beep = Beep::new(); let mut dc_out_controller = DcOutController::get_instance().expect("Can not get DcOutController instance"); @@ -100,33 +85,93 @@ fn main() { .watch() .expect("Can not watch for dc_out_controller"); - let _dc_out_state_subscription; - if let Some(eventloop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } { - _dc_out_state_subscription = eventloop - .subscribe(move |message: &DcOutControllerState| { - info!("Event Loop Value"); - match message.status { - dc_out_controller::DcOutStatus::WaitingOff => { - beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.") + sleep(Duration::from_millis(100)); + + let mut mq = MessageQueue::new(); + mq.init().unwrap(); + + let _mq_subscription; + match mq.watch() { + Err(err) => { + error!("Can not watch MessageQueue. {}", err); + } + Ok(subscription) => _mq_subscription = subscription, + } + + let _voltage_subscription; + if let Some(voltage_event_loop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } { + if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } { + _voltage_subscription = voltage_event_loop + .subscribe(move |message: &VoltageDetectionWorker| { + if let Ok(json_str) = serde_json::to_string(&message) { + let result = mq_event_loop.post( + &MqDto { + topic: "voltage", + message: json_str.as_bytes(), + }, + None, + ); + match result { + Ok(success) => { + if !success { + warn!("post voltage to mq event failed.") + } + } + Err(err) => warn!("post voltage to mq event failed. {}", err), + } } - dc_out_controller::DcOutStatus::WaitingOn(_) => { - beep.play(ringtone::BATTERY_LOW).expect("Can not beep.") - } - dc_out_controller::DcOutStatus::TurningOff(_) => { - beep.play(ringtone::SHUTDOWN).expect("Can not beep.") - } - _ => {beep.play(ringtone::SILENCE).expect("Can not beep.")} - } - if let Err(err) = mq.publish("dcOut", message.to_json().as_bytes()) { - warn!("Can not publish message to MQTT. {}", err); - } - }) - .expect(" Listening Event Loop Failed"); + }) + .expect(" Listening Event Loop Failed"); + } else { + panic!("MQ_EVENTLOOP is undefined!"); + } } else { - error!("DC_OUT_STATE_EVENT_LOOP is None"); + panic!("VOLTAGE_EVENTLOOP is undefined!"); + } + + let _dc_out_state_subscription; + if let Some(dc_state_event_loop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } { + if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } { + _dc_out_state_subscription = dc_state_event_loop + .subscribe(move |message: &DcOutControllerState| { + info!("Event Loop Value"); + match message.status { + dc_out_controller::DcOutStatus::WaitingOff => { + beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.") + } + dc_out_controller::DcOutStatus::WaitingOn(_) => { + beep.play(ringtone::BATTERY_LOW).expect("Can not beep.") + } + dc_out_controller::DcOutStatus::TurningOff(_) => { + beep.play(ringtone::SHUTDOWN).expect("Can not beep.") + } + _ => beep.play(ringtone::SILENCE).expect("Can not beep."), + } + let result = mq_event_loop.post( + &MqDto { + topic: "dc_out_state", + message: message.to_json().as_bytes(), + }, + None, + ); + match result { + Ok(success) => { + if !success { + warn!("post dc_out_state event failed.") + } + } + Err(err) => warn!("post dc_out_state event failed. {}", err), + } + }) + .expect(" Listening Event Loop Failed"); + } else { + panic!("MQ_EVENTLOOP is undefined!"); + } + } else { + panic!("DC_OUT_STATE_EVENT_LOOP is undefined!"); } loop { - sleep(Duration::from_millis(1000)); + sleep(Duration::from_millis(100)); } } diff --git a/src/message_queue.rs b/src/message_queue.rs index 905eaac..09d5790 100644 --- a/src/message_queue.rs +++ b/src/message_queue.rs @@ -1,18 +1,35 @@ use std::thread; use anyhow::Result; -use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS}; -use esp_idf_svc::mqtt::client::{EspMqttClient, MqttClientConfiguration}; -use esp_idf_sys::EspError; +use embedded_svc::{ + event_bus::EventBus, + mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS}, +}; +use esp_idf_svc::{ + eventloop::{ + Background, EspBackgroundEventLoop, EspEventFetchData, EspEventLoop, EspEventPostData, + EspSubscription, EspTypedEventDeserializer, EspTypedEventSerializer, EspTypedEventSource, + User, + }, + mqtt::client::{EspMqttClient, MqttClientConfiguration}, +}; +use esp_idf_sys::{c_types, EspError}; use log::*; +use serde::{Deserialize, Serialize}; + +pub static mut MQ_EVENTLOOP: Option>> = None; pub struct MessageQueue { pub client: Option>>, + voltage_subscription: Option>>, } impl MessageQueue { pub fn new() -> MessageQueue { - return MessageQueue { client: None }; + return MessageQueue { + client: None, + voltage_subscription: None, + }; } pub fn init(&mut self) -> Result<()> { @@ -73,4 +90,56 @@ impl MessageQueue { ) .map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err)) } + + pub fn watch(mut self) -> anyhow::Result>> { + match EspBackgroundEventLoop::new(&Default::default()) { + Ok(eventloop) => unsafe { MQ_EVENTLOOP = Some(eventloop) }, + Err(err) => anyhow::bail!("Init Event Loop failed. {:?}", err), + } + if let Some(eventloop) = unsafe { MQ_EVENTLOOP.as_mut() } { + let subscription = eventloop + .subscribe(move |dto: &MqDto| { + info!( + "publish data: {:?}", + unsafe { + String::from_utf8_unchecked(dto.message.to_vec()) + } + ); + if let Err(err) = self.publish(dto.topic, dto.message) { + warn!("Can not publish message to MQTT. {}", err); + } + }) + .map_err(|err| anyhow::anyhow!(" Listening Event Loop Failed. {}", err))?; + // self.voltage_subscription = Some(subscription); + return anyhow::Ok(subscription); + } + anyhow::bail!("MQ_EVENTLOOP is Not Defined.") + } +} + +#[derive(Copy, Debug, Clone, Serialize, Deserialize)] +pub struct MqDto<'a, 'b> { + pub topic: &'a str, + pub message: &'b [u8], +} + +impl EspTypedEventSource for MqDto<'_, '_> { + fn source() -> *const c_types::c_char { + b"MqDto\0".as_ptr() as *const _ + } +} + +impl EspTypedEventSerializer> for MqDto<'_, '_> { + fn serialize(event: &MqDto, f: impl for<'a> FnOnce(&'a EspEventPostData) -> R) -> R { + f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) }) + } +} + +impl<'b, 'c> EspTypedEventDeserializer> for MqDto<'b, 'c> { + fn deserialize( + data: &EspEventFetchData, + f: &mut impl for<'a> FnMut(&'a MqDto<'b, 'c>) -> R, + ) -> R { + f(unsafe { data.as_payload() }) + } } diff --git a/src/voltage_detection.rs b/src/voltage_detection.rs index f952377..3e2caad 100644 --- a/src/voltage_detection.rs +++ b/src/voltage_detection.rs @@ -18,7 +18,7 @@ use esp_idf_hal::{ use esp_idf_svc::{ eventloop::{ EspEventFetchData, EspEventPostData, EspTypedEventDeserializer, EspTypedEventSerializer, - EspTypedEventSource, + EspTypedEventSource, EspEventLoop, Background, EspBackgroundEventLoop, }, timer::{EspTimer, EspTimerService}, }; @@ -26,7 +26,7 @@ use esp_idf_sys::c_types; use log::{info, warn}; use serde::{Deserialize, Serialize}; -use crate::VOLTAGE_EVENTLOOP; +pub static mut VOLTAGE_EVENTLOOP: Option>> = None; const ADAPTER_OFFSET: f32 = 12002f32 / 900f32; const BATTERY_OFFSET: f32 = 12002f32 / 900f32; @@ -53,6 +53,10 @@ impl VoltageDetection { } pub fn watching(&mut self) -> anyhow::Result<()> { + match EspBackgroundEventLoop::new(&Default::default()) { + Ok(eventloop) => unsafe { VOLTAGE_EVENTLOOP = Some(eventloop) }, + Err(err) => anyhow::bail!("Init Event Loop failed. {:?}", err), + } let worker = Arc::new(Mutex::new(self.worker)); let mut timer = EspTimerService::new()?.timer(move || { match worker.lock().as_mut() {