diff --git a/src/main.rs b/src/main.rs index c3309b3..ad82b7e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,11 @@ use esp_idf_svc::eventloop::{Background, EspBackgroundEventLoop, EspEventLoop}; use esp_idf_sys::{self as _}; use log::*; use std::{ + borrow::Borrow, env, + sync::{Arc, Mutex}, thread::{self, sleep}, - time::Duration, borrow::Borrow, + time::Duration, }; mod beep; @@ -19,7 +21,7 @@ mod wifi; use crate::{ beep::{ringtone, Beep}, dc_out_controller::{DcOutController, DcOutControllerState, DC_OUT_STATE_EVENT_LOOP}, - message_queue::{MqDto, MQ_EVENTLOOP}, + message_queue::MqDto, voltage_detection::{VoltageDetectionWorker, VOLTAGE_EVENTLOOP}, }; use crate::{ @@ -41,13 +43,6 @@ fn main() { let peripherals = esp_idf_hal::peripherals::Peripherals::take().unwrap(); let blink_pin = peripherals.pins.gpio5; - let beep_pin = peripherals.pins.gpio6; - let ledc_timer0 = peripherals.ledc.timer0; - let ledc_channel0 = peripherals.ledc.channel0; - let dc_out_ctl_pin = peripherals.pins.gpio3; - let i2c0 = peripherals.i2c0; - let sda_pin = peripherals.pins.gpio4; - let scl_pin = peripherals.pins.gpio10; info!("About to start a background event loop"); match EspBackgroundEventLoop::new(&Default::default()) { @@ -87,86 +82,81 @@ fn main() { sleep(Duration::from_millis(100)); - let mut mq = MessageQueue::new(); - mq.init().unwrap(); + let mut _mq = MessageQueue::new(); let _mq_subscription; - match mq.watch() { + match _mq.watch() { Err(err) => { error!("Can not watch MessageQueue. {}", err); } Ok(subscription) => _mq_subscription = subscription, } + let _mq_tx_for_voltage = _mq.tx.clone(); + let _mq_tx_for_dc_out_state = _mq.tx.clone(); + let _voltage_subscription; if let Some(voltage_event_loop) = unsafe { VOLTAGE_EVENTLOOP.as_mut() } { - if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } { - _voltage_subscription = voltage_event_loop - .subscribe(move |message: &VoltageDetectionWorker| { - if let Ok(json_str) = serde_json::to_string(&message) { - let result = mq_event_loop.post( - &MqDto { - topic: "voltage", - message: json_str.as_bytes(), - }, - None, - ); - match result { - Ok(success) => { - if !success { - warn!("post voltage to mq event failed.") + _voltage_subscription = voltage_event_loop + .subscribe(move |message: &VoltageDetectionWorker| { + if let Ok(json_str) = serde_json::to_string(&message) { + match _mq_tx_for_voltage.lock() { + Ok(tx) => { + let result = tx.send(MqDto { + topic: "voltage".to_string(), + message: json_str, + }); + match result { + Ok(()) => { + warn!("send voltage to mq message failed.") } + Err(err) => warn!("send voltage to mq message failed. {}", err), } - Err(err) => warn!("post voltage to mq event failed. {}", err), } + Err(err) => warn!("send voltage to mq message failed. {}", err), } - }) - .expect(" Listening Event Loop Failed"); - } else { - panic!("MQ_EVENTLOOP is undefined!"); - } + } + }) + .expect(" Listening Event Loop Failed"); } else { panic!("VOLTAGE_EVENTLOOP is undefined!"); } let _dc_out_state_subscription; if let Some(dc_state_event_loop) = unsafe { DC_OUT_STATE_EVENT_LOOP.as_mut() } { - if let Some(mq_event_loop) = unsafe { MQ_EVENTLOOP.as_mut() } { - _dc_out_state_subscription = dc_state_event_loop - .subscribe(move |message: &DcOutControllerState| { - info!("Event Loop Value"); - match message.status { - dc_out_controller::DcOutStatus::WaitingOff => { - beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.") - } - dc_out_controller::DcOutStatus::WaitingOn(_) => { - beep.play(ringtone::BATTERY_LOW).expect("Can not beep.") - } - dc_out_controller::DcOutStatus::TurningOff(_) => { - beep.play(ringtone::SHUTDOWN).expect("Can not beep.") - } - _ => beep.play(ringtone::SILENCE).expect("Can not beep."), + _dc_out_state_subscription = dc_state_event_loop + .subscribe(move |message: &DcOutControllerState| { + info!("Event Loop Value"); + match message.status { + dc_out_controller::DcOutStatus::WaitingOff => { + beep.play(ringtone::ADAPTER_DOWN).expect("Can not beep.") } - let result = mq_event_loop.post( - &MqDto { - topic: "dc_out_state", - message: message.to_json().as_bytes(), - }, - None, - ); - match result { - Ok(success) => { - if !success { - warn!("post dc_out_state event failed.") + dc_out_controller::DcOutStatus::WaitingOn(_) => { + beep.play(ringtone::BATTERY_LOW).expect("Can not beep.") + } + dc_out_controller::DcOutStatus::TurningOff(_) => { + beep.play(ringtone::SHUTDOWN).expect("Can not beep.") + } + _ => beep.play(ringtone::SILENCE).expect("Can not beep."), + } + match _mq_tx_for_dc_out_state.lock() { + Ok(tx) => { + let result = tx.send(MqDto { + topic: "dc_out_state".to_string(), + message: message.to_json(), + }); + + match result { + Ok(_) => { + info!("send dc_out_state message success.") } + Err(err) => warn!("send dc_out_state message failed. {}", err), } - Err(err) => warn!("post dc_out_state event failed. {}", err), } - }) - .expect(" Listening Event Loop Failed"); - } else { - panic!("MQ_EVENTLOOP is undefined!"); - } + Err(err) => warn!("send dc_out_state to mq message failed. {}", err), + } + }) + .expect(" Listening Event Loop Failed"); } else { panic!("DC_OUT_STATE_EVENT_LOOP is undefined!"); } diff --git a/src/message_queue.rs b/src/message_queue.rs index 09d5790..5c95911 100644 --- a/src/message_queue.rs +++ b/src/message_queue.rs @@ -1,38 +1,32 @@ -use std::thread; +use std::{ + sync::{ + mpsc::{self, Receiver, Sender}, + Arc, Mutex, + }, + thread::{self, spawn}, + time::Duration, +}; use anyhow::Result; -use embedded_svc::{ - event_bus::EventBus, - mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS}, -}; +use embedded_svc::mqtt::client::{utils::ConnState, Client, Connection, MessageImpl, Publish, QoS}; use esp_idf_svc::{ - eventloop::{ - Background, EspBackgroundEventLoop, EspEventFetchData, EspEventLoop, EspEventPostData, - EspSubscription, EspTypedEventDeserializer, EspTypedEventSerializer, EspTypedEventSource, - User, - }, + eventloop::{Background, EspEventLoop}, mqtt::client::{EspMqttClient, MqttClientConfiguration}, }; -use esp_idf_sys::{c_types, EspError}; +use esp_idf_sys::EspError; use log::*; -use serde::{Deserialize, Serialize}; - -pub static mut MQ_EVENTLOOP: Option>> = None; pub struct MessageQueue { - pub client: Option>>, - voltage_subscription: Option>>, + pub tx: Arc>>, + pub rx: Arc>>, } -impl MessageQueue { - pub fn new() -> MessageQueue { - return MessageQueue { - client: None, - voltage_subscription: None, - }; - } +pub struct MessageQueueWatcher { + pub client: EspMqttClient>, +} - pub fn init(&mut self) -> Result<()> { +impl MessageQueueWatcher { + pub fn new() -> Result { let conf = MqttClientConfiguration { client_id: Some("rust-esp32-std-demo"), crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), @@ -74,14 +68,11 @@ impl MessageQueue { info!("Published a hello message to topic \"esp32-c3-rust-wifi-demo/ping\""); - self.client = Some(client); - anyhow::Ok(()) + anyhow::Ok(Self { client }) } pub fn publish(&mut self, topic: &str, bytes: &[u8]) -> Result { self.client - .as_mut() - .unwrap() .publish( format!("{}/{}", "ups_0_2", topic).as_str(), QoS::AtMostOnce, @@ -90,56 +81,35 @@ impl MessageQueue { ) .map_err(|err| anyhow::anyhow!("publish message to queue was failed!. {}", err)) } +} - pub fn watch(mut self) -> anyhow::Result>> { - match EspBackgroundEventLoop::new(&Default::default()) { - Ok(eventloop) => unsafe { MQ_EVENTLOOP = Some(eventloop) }, - Err(err) => anyhow::bail!("Init Event Loop failed. {:?}", err), - } - if let Some(eventloop) = unsafe { MQ_EVENTLOOP.as_mut() } { - let subscription = eventloop - .subscribe(move |dto: &MqDto| { - info!( - "publish data: {:?}", - unsafe { - String::from_utf8_unchecked(dto.message.to_vec()) - } - ); - if let Err(err) = self.publish(dto.topic, dto.message) { - warn!("Can not publish message to MQTT. {}", err); - } - }) - .map_err(|err| anyhow::anyhow!(" Listening Event Loop Failed. {}", err))?; - // self.voltage_subscription = Some(subscription); - return anyhow::Ok(subscription); - } - anyhow::bail!("MQ_EVENTLOOP is Not Defined.") +impl MessageQueue { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(); + return MessageQueue { + tx: Arc::new(Mutex::new(tx)), + rx: Arc::new(Mutex::new(rx)), + }; + } + + pub fn watch(&mut self) -> anyhow::Result<()> { + let mut watcher = MessageQueueWatcher::new() + .map_err(|err| anyhow::anyhow!("Create MessageQueueWatcher failed. {}", err))?; + let rx = self.rx.to_owned(); + // let (tx, rx) = mpsc::channel::(); + spawn(move || loop { + if let Ok(dto) = rx.lock().unwrap().recv_timeout(Duration::from_secs(1)) { + if let Err(err) = watcher.publish(dto.topic.as_str(), dto.message.as_bytes()) { + warn!("Can not publish message to MQTT. {}", err); + } + } + }); + anyhow::Ok(()) } } -#[derive(Copy, Debug, Clone, Serialize, Deserialize)] -pub struct MqDto<'a, 'b> { - pub topic: &'a str, - pub message: &'b [u8], -} - -impl EspTypedEventSource for MqDto<'_, '_> { - fn source() -> *const c_types::c_char { - b"MqDto\0".as_ptr() as *const _ - } -} - -impl EspTypedEventSerializer> for MqDto<'_, '_> { - fn serialize(event: &MqDto, f: impl for<'a> FnOnce(&'a EspEventPostData) -> R) -> R { - f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) }) - } -} - -impl<'b, 'c> EspTypedEventDeserializer> for MqDto<'b, 'c> { - fn deserialize( - data: &EspEventFetchData, - f: &mut impl for<'a> FnMut(&'a MqDto<'b, 'c>) -> R, - ) -> R { - f(unsafe { data.as_payload() }) - } +#[derive(Debug, Clone)] +pub struct MqDto { + pub message: String, + pub topic: String, }