feat: MQTT client 改为 paho-mqtt. #5.
This commit is contained in:
		
							
								
								
									
										128
									
								
								src-tauri/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										128
									
								
								src-tauri/Cargo.lock
									
									
									
										generated
									
									
									
								
							@@ -867,7 +867,6 @@ dependencies = [
 | 
			
		||||
 "paho-mqtt",
 | 
			
		||||
 "paris",
 | 
			
		||||
 "redb",
 | 
			
		||||
 "rumqttc",
 | 
			
		||||
 "scrap",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "serde_json",
 | 
			
		||||
@@ -1028,7 +1027,7 @@ dependencies = [
 | 
			
		||||
 "futures-sink",
 | 
			
		||||
 "nanorand",
 | 
			
		||||
 "pin-project",
 | 
			
		||||
 "spin 0.9.4",
 | 
			
		||||
 "spin",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
@@ -2707,12 +2706,6 @@ dependencies = [
 | 
			
		||||
 "windows-sys 0.42.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "pollster"
 | 
			
		||||
version = "0.2.5"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "ppv-lite86"
 | 
			
		||||
version = "0.2.17"
 | 
			
		||||
@@ -3019,40 +3012,6 @@ dependencies = [
 | 
			
		||||
 "windows 0.37.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "ring"
 | 
			
		||||
version = "0.16.20"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "cc",
 | 
			
		||||
 "libc",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "spin 0.5.2",
 | 
			
		||||
 "untrusted",
 | 
			
		||||
 "web-sys",
 | 
			
		||||
 "winapi 0.3.9",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rumqttc"
 | 
			
		||||
version = "0.17.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "499b7ab08ffa5a722958b6ce1b7c0270bea30909f589d12c5ec3a051afe423fc"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "bytes",
 | 
			
		||||
 "flume",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "http",
 | 
			
		||||
 "log",
 | 
			
		||||
 "pollster",
 | 
			
		||||
 "rustls-native-certs",
 | 
			
		||||
 "rustls-pemfile 0.3.0",
 | 
			
		||||
 "thiserror",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tokio-rustls",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rustc_version"
 | 
			
		||||
version = "0.3.3"
 | 
			
		||||
@@ -3071,48 +3030,6 @@ dependencies = [
 | 
			
		||||
 "semver 1.0.16",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rustls"
 | 
			
		||||
version = "0.20.7"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "log",
 | 
			
		||||
 "ring",
 | 
			
		||||
 "sct",
 | 
			
		||||
 "webpki",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rustls-native-certs"
 | 
			
		||||
version = "0.6.2"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "openssl-probe",
 | 
			
		||||
 "rustls-pemfile 1.0.1",
 | 
			
		||||
 "schannel",
 | 
			
		||||
 "security-framework",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rustls-pemfile"
 | 
			
		||||
version = "0.3.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "base64",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rustls-pemfile"
 | 
			
		||||
version = "1.0.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "base64",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "rustversion"
 | 
			
		||||
version = "1.0.11"
 | 
			
		||||
@@ -3180,16 +3097,6 @@ dependencies = [
 | 
			
		||||
 "winapi 0.2.8",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "sct"
 | 
			
		||||
version = "0.7.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "ring",
 | 
			
		||||
 "untrusted",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "security-framework"
 | 
			
		||||
version = "2.7.0"
 | 
			
		||||
@@ -3488,12 +3395,6 @@ dependencies = [
 | 
			
		||||
 "system-deps 5.0.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "spin"
 | 
			
		||||
version = "0.5.2"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "spin"
 | 
			
		||||
version = "0.9.4"
 | 
			
		||||
@@ -4033,17 +3934,6 @@ dependencies = [
 | 
			
		||||
 "syn",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "tokio-rustls"
 | 
			
		||||
version = "0.23.4"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "rustls",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "webpki",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "toml"
 | 
			
		||||
version = "0.5.10"
 | 
			
		||||
@@ -4185,12 +4075,6 @@ version = "0.0.2"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "74c1aa4511c38276c548406f0b1f5f8b793f000cfb51e18f278a102abd057e81"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "untrusted"
 | 
			
		||||
version = "0.7.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "url"
 | 
			
		||||
version = "2.3.1"
 | 
			
		||||
@@ -4432,16 +4316,6 @@ dependencies = [
 | 
			
		||||
 "libwebp-sys",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "webpki"
 | 
			
		||||
version = "0.22.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "ring",
 | 
			
		||||
 "untrusted",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "webview2-com"
 | 
			
		||||
version = "0.19.1"
 | 
			
		||||
 
 | 
			
		||||
@@ -28,7 +28,6 @@ tokio = { version = "1.22.0", features = ["full"] }
 | 
			
		||||
tracing = "0.1.37"
 | 
			
		||||
tracing-subscriber = "0.3.16"
 | 
			
		||||
hex = "0.4.3"
 | 
			
		||||
rumqttc = "0.17.0"
 | 
			
		||||
time = { version = "0.3.17", features = ["formatting"] }
 | 
			
		||||
color_space = "0.5.3"
 | 
			
		||||
futures = "0.3.25"
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
use paris::error;
 | 
			
		||||
use tokio::sync::{broadcast, OnceCell};
 | 
			
		||||
 | 
			
		||||
use crate::{display, picker::led_color::LedColor, models};
 | 
			
		||||
use crate::{display, models, picker::led_color::LedColor};
 | 
			
		||||
 | 
			
		||||
use super::mqtt::MqttRpc;
 | 
			
		||||
 | 
			
		||||
@@ -14,11 +14,13 @@ impl Manager {
 | 
			
		||||
    pub async fn global() -> &'static Self {
 | 
			
		||||
        static RPC_MANAGER: OnceCell<Manager> = OnceCell::const_new();
 | 
			
		||||
 | 
			
		||||
        RPC_MANAGER.get_or_init(|| Manager::new()).await
 | 
			
		||||
        RPC_MANAGER
 | 
			
		||||
            .get_or_init(|| async { Manager::new().await.unwrap() })
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn new() -> Self {
 | 
			
		||||
        let mut mqtt = MqttRpc::new();
 | 
			
		||||
    pub async fn new() -> anyhow::Result<Self> {
 | 
			
		||||
        let mqtt = MqttRpc::new().await?;
 | 
			
		||||
        let initialized = match mqtt.initialize().await {
 | 
			
		||||
            Ok(_) => true,
 | 
			
		||||
            Err(err) => {
 | 
			
		||||
@@ -26,10 +28,10 @@ impl Manager {
 | 
			
		||||
                false
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        Self {
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            client: mqtt,
 | 
			
		||||
            initialized,
 | 
			
		||||
        }
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn listen(&self) {
 | 
			
		||||
 
 | 
			
		||||
@@ -1,174 +1,184 @@
 | 
			
		||||
use crate::{display, models};
 | 
			
		||||
use image::EncodableLayout;
 | 
			
		||||
use paris::{warn, info, error};
 | 
			
		||||
use rumqttc::{
 | 
			
		||||
    AsyncClient, ConnectReturnCode, Event, EventLoop, Incoming, MqttOptions, Outgoing, QoS,
 | 
			
		||||
};
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use paho_mqtt as mqtt;
 | 
			
		||||
use paris::{error, info, warn};
 | 
			
		||||
use serde_json::json;
 | 
			
		||||
use std::{borrow::Borrow, rc::Rc, sync::Arc, time::Duration};
 | 
			
		||||
use tauri::async_runtime::{Mutex, TokioJoinHandle};
 | 
			
		||||
use time::{format_description, OffsetDateTime};
 | 
			
		||||
use tokio::{sync::broadcast, task, time::sleep};
 | 
			
		||||
 | 
			
		||||
const DISPLAY_TOPIC: &'static str = "display-ambient-light/display";
 | 
			
		||||
const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop";
 | 
			
		||||
const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness";
 | 
			
		||||
const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd";
 | 
			
		||||
const DESKTOP_SEND_CMD: &'static str = "display-ambient-light/desktop/cmd";
 | 
			
		||||
 | 
			
		||||
pub struct MqttRpc {
 | 
			
		||||
    client: AsyncClient,
 | 
			
		||||
    client: mqtt::AsyncClient,
 | 
			
		||||
    change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>,
 | 
			
		||||
    message_tx: broadcast::Sender<models::MqMessage>,
 | 
			
		||||
    eventloop: Arc<Mutex<EventLoop>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl MqttRpc {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883);
 | 
			
		||||
        options.set_keep_alive(Duration::from_secs(5));
 | 
			
		||||
        options.set_clean_session(false);
 | 
			
		||||
    pub async fn new() -> anyhow::Result<Self> {
 | 
			
		||||
        let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883")
 | 
			
		||||
            .map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?;
 | 
			
		||||
 | 
			
		||||
        let (client, mut eventloop) = AsyncClient::new(options, 10);
 | 
			
		||||
        client.set_connected_callback(|client| {
 | 
			
		||||
            info!("MQTT server connected.");
 | 
			
		||||
 | 
			
		||||
            client.subscribe("display-ambient-light/board/#", mqtt::QOS_1);
 | 
			
		||||
 | 
			
		||||
            client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1);
 | 
			
		||||
        });
 | 
			
		||||
        client.set_connection_lost_callback(|client| {
 | 
			
		||||
            info!("MQTT server connection lost.");
 | 
			
		||||
        });
 | 
			
		||||
        client.set_disconnected_callback(|_, a1, a2| {
 | 
			
		||||
            info!("MQTT server disconnected. {:?} {:?}", a1, a2);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        let mut last_will_payload = serde_json::Map::new();
 | 
			
		||||
        last_will_payload.insert("message".to_string(), json!("offline"));
 | 
			
		||||
        last_will_payload.insert(
 | 
			
		||||
            "time".to_string(),
 | 
			
		||||
            serde_json::Value::String(
 | 
			
		||||
                OffsetDateTime::now_utc()
 | 
			
		||||
                    .format(
 | 
			
		||||
                        &format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second]")
 | 
			
		||||
                            .unwrap(),
 | 
			
		||||
                    )
 | 
			
		||||
                    .unwrap()
 | 
			
		||||
                    .to_string(),
 | 
			
		||||
            ),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let last_will = mqtt::Message::new(
 | 
			
		||||
            format!("{}/status", DESKTOP_TOPIC),
 | 
			
		||||
            serde_json::to_string(&last_will_payload)
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .as_bytes(),
 | 
			
		||||
            mqtt::QOS_1,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let connect_options = mqtt::ConnectOptionsBuilder::new()
 | 
			
		||||
            .keep_alive_interval(Duration::from_secs(5))
 | 
			
		||||
            .automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5))
 | 
			
		||||
            .finalize();
 | 
			
		||||
 | 
			
		||||
        let token = client.connect(connect_options);
 | 
			
		||||
 | 
			
		||||
        token.await.map_err(|err| {
 | 
			
		||||
            anyhow::anyhow!(
 | 
			
		||||
                "can not connect MQTT server. wait for connect token failed. {:?}",
 | 
			
		||||
                err
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        let (change_display_brightness_tx, _) =
 | 
			
		||||
            broadcast::channel::<display::DisplayBrightness>(16);
 | 
			
		||||
        let (message_tx, _) = broadcast::channel::<models::MqMessage>(32);
 | 
			
		||||
 | 
			
		||||
        Self {
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            client,
 | 
			
		||||
            change_display_brightness_tx,
 | 
			
		||||
            message_tx,
 | 
			
		||||
            eventloop: Arc::new(Mutex::new(eventloop)),
 | 
			
		||||
        }
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn listen(&self) {
 | 
			
		||||
        let change_display_brightness_tx2 = self.change_display_brightness_tx.clone();
 | 
			
		||||
        let message_tx_cloned = self.message_tx.clone();
 | 
			
		||||
 | 
			
		||||
        let mut eventloop = self.eventloop.clone().lock_owned().await;
 | 
			
		||||
        let mut stream = self.client.to_owned().get_stream(100);
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            match eventloop.poll().await {
 | 
			
		||||
                Ok(notification) => {
 | 
			
		||||
                    if let Event::Incoming(notification) = notification {
 | 
			
		||||
                        if let Incoming::Publish(notification) = notification {
 | 
			
		||||
                            match notification.topic.as_str() {
 | 
			
		||||
                                DISPLAY_BRIGHTNESS_TOPIC => {
 | 
			
		||||
                                    let payload_text = String::from_utf8(
 | 
			
		||||
                                        notification.payload.as_bytes().to_owned(),
 | 
			
		||||
                                    );
 | 
			
		||||
                                    match payload_text {
 | 
			
		||||
                                        Ok(payload_text) => {
 | 
			
		||||
                                            let display_brightness: Result<
 | 
			
		||||
                                                display::DisplayBrightness,
 | 
			
		||||
                                                _,
 | 
			
		||||
                                            > = serde_json::from_str(payload_text.as_str());
 | 
			
		||||
                                            match display_brightness {
 | 
			
		||||
                                                Ok(display_brightness) => {
 | 
			
		||||
                                                    match change_display_brightness_tx2
 | 
			
		||||
                                                        .send(display_brightness)
 | 
			
		||||
                                                    {
 | 
			
		||||
                                                        Ok(_) => {}
 | 
			
		||||
                                                        Err(err) => {
 | 
			
		||||
                                                            warn!(
 | 
			
		||||
                                                                "can not broadcast display brightness. {:?}",
 | 
			
		||||
                                                                err
 | 
			
		||||
                                                            );
 | 
			
		||||
                                                        }
 | 
			
		||||
                                                    };
 | 
			
		||||
                                                }
 | 
			
		||||
                                                Err(err) => {
 | 
			
		||||
                                                    warn!(
 | 
			
		||||
                                                "can not deserialize display brightness. {:?}",
 | 
			
		||||
                                                err
 | 
			
		||||
                                            );
 | 
			
		||||
                                                }
 | 
			
		||||
                                            }
 | 
			
		||||
                                        }
 | 
			
		||||
                                        Err(err) => {
 | 
			
		||||
                                            warn!(
 | 
			
		||||
                                                "can not decode display brightness message. {:?}",
 | 
			
		||||
                                                err
 | 
			
		||||
                                            );
 | 
			
		||||
                                        }
 | 
			
		||||
                                    };
 | 
			
		||||
                                }
 | 
			
		||||
                                BOARD_SEND_CMD => {
 | 
			
		||||
                                    let payload_text = String::from_utf8(
 | 
			
		||||
                                        notification.payload.as_bytes().to_owned(),
 | 
			
		||||
                                    );
 | 
			
		||||
                                    match payload_text {
 | 
			
		||||
                                        Ok(payload_text) => {
 | 
			
		||||
                                            let message: Result<models::MqMessage, _> =
 | 
			
		||||
                                                serde_json::from_str(payload_text.as_str());
 | 
			
		||||
                                            match message {
 | 
			
		||||
                                                Ok(message) => {
 | 
			
		||||
                                                    match message_tx_cloned.send(message) {
 | 
			
		||||
                                                        Ok(_) => {}
 | 
			
		||||
                                                        Err(err) => {
 | 
			
		||||
                                                            warn!(
 | 
			
		||||
                                                                "can not broadcast mq message. {:?}",
 | 
			
		||||
                                                                err
 | 
			
		||||
                                                            );
 | 
			
		||||
                                                        }
 | 
			
		||||
                                                    };
 | 
			
		||||
                                                }
 | 
			
		||||
                                                Err(err) => {
 | 
			
		||||
                                                    warn!(
 | 
			
		||||
                                                        "can not deserialize mq message. {:?}",
 | 
			
		||||
        while let Some(notification) = stream.next().await {
 | 
			
		||||
            match notification {
 | 
			
		||||
                Some(notification) => match notification.topic() {
 | 
			
		||||
                    DISPLAY_BRIGHTNESS_TOPIC => {
 | 
			
		||||
                        let payload_text = String::from_utf8(notification.payload().to_vec());
 | 
			
		||||
                        match payload_text {
 | 
			
		||||
                            Ok(payload_text) => {
 | 
			
		||||
                                let display_brightness: Result<display::DisplayBrightness, _> =
 | 
			
		||||
                                    serde_json::from_str(payload_text.as_str());
 | 
			
		||||
                                match display_brightness {
 | 
			
		||||
                                    Ok(display_brightness) => {
 | 
			
		||||
                                        match change_display_brightness_tx2.send(display_brightness)
 | 
			
		||||
                                        {
 | 
			
		||||
                                            Ok(_) => {}
 | 
			
		||||
                                            Err(err) => {
 | 
			
		||||
                                                warn!(
 | 
			
		||||
                                                        "can not send display brightness to channel. {:?}",
 | 
			
		||||
                                                        err
 | 
			
		||||
                                                    );
 | 
			
		||||
                                                }
 | 
			
		||||
                                            }
 | 
			
		||||
                                        }
 | 
			
		||||
                                        Err(err) => {
 | 
			
		||||
                                            warn!("can not decode mq message message. {:?}", err);
 | 
			
		||||
                                        }
 | 
			
		||||
                                    };
 | 
			
		||||
                                }
 | 
			
		||||
                                &_ => {}
 | 
			
		||||
                            };
 | 
			
		||||
                        } else if let Incoming::ConnAck(connAck) = notification {
 | 
			
		||||
                            if connAck.code == ConnectReturnCode::Success {
 | 
			
		||||
                                match self.initialize().await {
 | 
			
		||||
                                    Ok(_) => {
 | 
			
		||||
                                        info!("resubscribe topics!");
 | 
			
		||||
                                    }
 | 
			
		||||
                                    Err(err) => {
 | 
			
		||||
                                        info!("resubscribe topics failed! {:?}", err);
 | 
			
		||||
                                        warn!(
 | 
			
		||||
                                            "can not parse display brightness from payload. {:?}",
 | 
			
		||||
                                            err
 | 
			
		||||
                                        );
 | 
			
		||||
                                    }
 | 
			
		||||
                                };
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                            Err(err) => {
 | 
			
		||||
                                warn!("can not parse display brightness from payload. {:?}", err);
 | 
			
		||||
                            }
 | 
			
		||||
                        } else if let Incoming::Disconnect = notification {
 | 
			
		||||
                            error!("MQTT Disconnected!");
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    println!("MQTT Error Event = {:?}", err);
 | 
			
		||||
                    BOARD_SEND_CMD => {
 | 
			
		||||
                        let payload_text = String::from_utf8(notification.payload().to_vec());
 | 
			
		||||
                        match payload_text {
 | 
			
		||||
                            Ok(payload_text) => {
 | 
			
		||||
                                let message: Result<models::MqMessage, _> =
 | 
			
		||||
                                    serde_json::from_str(payload_text.as_str());
 | 
			
		||||
                                match message {
 | 
			
		||||
                                    Ok(message) => match message_tx_cloned.send(message) {
 | 
			
		||||
                                        Ok(_) => {}
 | 
			
		||||
                                        Err(err) => {
 | 
			
		||||
                                            warn!("can not send message to channel. {:?}", err);
 | 
			
		||||
                                        }
 | 
			
		||||
                                    },
 | 
			
		||||
                                    Err(err) => {
 | 
			
		||||
                                        warn!("can not parse message from payload. {:?}", err);
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                            Err(err) => {
 | 
			
		||||
                                warn!("can not parse message from payload. {:?}", err);
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    _ => {}
 | 
			
		||||
                },
 | 
			
		||||
                _ => {
 | 
			
		||||
                    warn!("can not get notification from MQTT server.");
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn initialize(&self) -> anyhow::Result<()> {
 | 
			
		||||
        self.subscribe_board().await?;
 | 
			
		||||
        self.subscribe_display().await?;
 | 
			
		||||
        // self.subscribe_board()?;
 | 
			
		||||
        // self.subscribe_display()?;
 | 
			
		||||
        self.broadcast_desktop_online();
 | 
			
		||||
        anyhow::Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn subscribe_board(&self) -> anyhow::Result<()> {
 | 
			
		||||
    fn subscribe_board(&self) -> anyhow::Result<()> {
 | 
			
		||||
        self.client
 | 
			
		||||
            .subscribe("display-ambient-light/board/#", QoS::AtMostOnce)
 | 
			
		||||
            .await
 | 
			
		||||
            .subscribe("display-ambient-light/board/#", mqtt::QOS_1)
 | 
			
		||||
            .wait()
 | 
			
		||||
            .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err))
 | 
			
		||||
            .map(|_| ())
 | 
			
		||||
    }
 | 
			
		||||
    async fn subscribe_display(&self) -> anyhow::Result<()> {
 | 
			
		||||
    fn subscribe_display(&self) -> anyhow::Result<()> {
 | 
			
		||||
        self.client
 | 
			
		||||
            .subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce)
 | 
			
		||||
            .await
 | 
			
		||||
            .subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1)
 | 
			
		||||
            .wait()
 | 
			
		||||
            .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err))
 | 
			
		||||
            .map(|_| ())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn broadcast_desktop_online(&self) {
 | 
			
		||||
@@ -179,15 +189,12 @@ impl MqttRpc {
 | 
			
		||||
                    .format(&format_description::well_known::Iso8601::DEFAULT)
 | 
			
		||||
                {
 | 
			
		||||
                    Ok(now_str) => {
 | 
			
		||||
                        match client
 | 
			
		||||
                            .publish(
 | 
			
		||||
                                "display-ambient-light/desktop/online",
 | 
			
		||||
                                QoS::AtLeastOnce,
 | 
			
		||||
                                false,
 | 
			
		||||
                                now_str.as_bytes(),
 | 
			
		||||
                            )
 | 
			
		||||
                            .await
 | 
			
		||||
                        {
 | 
			
		||||
                        let msg = mqtt::Message::new(
 | 
			
		||||
                            "display-ambient-light/desktop/online",
 | 
			
		||||
                            now_str.as_bytes(),
 | 
			
		||||
                            mqtt::QOS_0,
 | 
			
		||||
                        );
 | 
			
		||||
                        match client.publish(msg).await {
 | 
			
		||||
                            Ok(_) => {}
 | 
			
		||||
                            Err(error) => {
 | 
			
		||||
                                warn!("can not publish last online time. {}", error)
 | 
			
		||||
@@ -205,12 +212,11 @@ impl MqttRpc {
 | 
			
		||||
 | 
			
		||||
    pub async fn publish_led_sub_pixels(&self, payload: Vec<u8>) -> anyhow::Result<()> {
 | 
			
		||||
        self.client
 | 
			
		||||
            .publish(
 | 
			
		||||
            .publish(mqtt::Message::new(
 | 
			
		||||
                "display-ambient-light/desktop/colors",
 | 
			
		||||
                rumqttc::QoS::AtLeastOnce,
 | 
			
		||||
                false,
 | 
			
		||||
                payload,
 | 
			
		||||
            )
 | 
			
		||||
                mqtt::QOS_1,
 | 
			
		||||
            ))
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
 | 
			
		||||
    }
 | 
			
		||||
@@ -224,12 +230,11 @@ impl MqttRpc {
 | 
			
		||||
        let str = serde_json::to_string(&msg)
 | 
			
		||||
            .map_err(|err| anyhow::anyhow!("can not serialize {:?}. {:?}", msg, err))?;
 | 
			
		||||
        self.client
 | 
			
		||||
            .publish(
 | 
			
		||||
            .publish(mqtt::Message::new(
 | 
			
		||||
                DESKTOP_SEND_CMD,
 | 
			
		||||
                rumqttc::QoS::AtLeastOnce,
 | 
			
		||||
                false,
 | 
			
		||||
                str.as_bytes(),
 | 
			
		||||
            )
 | 
			
		||||
                mqtt::QOS_1,
 | 
			
		||||
            ))
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user