diff --git a/src-tauri/src/ambient_light/publisher.rs b/src-tauri/src/ambient_light/publisher.rs index 6a66e4a..fdd90df 100644 --- a/src-tauri/src/ambient_light/publisher.rs +++ b/src-tauri/src/ambient_light/publisher.rs @@ -1,4 +1,4 @@ -use std::{borrow::Borrow, collections::HashMap, io::Read, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use paris::warn; use tauri::async_runtime::RwLock; @@ -10,7 +10,7 @@ use tokio::{ use crate::{ ambient_light::{config, ConfigManager}, - rpc::MqttRpc, + led_color::LedColor, screenshot::LedSamplePoints, screenshot_manager::{self, ScreenshotManager}, }; @@ -103,52 +103,24 @@ impl LedColorsPublisher { let colors: Vec = colors.unwrap(); - // let color_len = colors.len(); - let display_led_offset = mappers - .clone() - .iter() - .flat_map(|mapper| [mapper.start, mapper.end]) - .min() - .unwrap(); + let colors_copy = colors.clone(); - for group in mappers.clone() { - if (group.start.abs_diff(group.end)) > colors.len() { - warn!( - "get_sorted_colors: color_index out of range. color_index: {}, strip len: {}, colors.len(): {}", - group.pos, - group.start.abs_diff(group.end), - colors.len() - ); - return; - } + let mappers = mappers.clone(); - let group_size = group.start.abs_diff(group.end); - let mut buffer = Vec::::with_capacity(group_size * 3); - - if group.end > group.start { - for i in group.pos-display_led_offset..group_size + group.pos-display_led_offset { - let bytes = colors[i].as_bytes(); - buffer.append(&mut bytes.to_vec()); + tokio::spawn(async move { + match Self::send_colors_by_display(colors, mappers).await { + Ok(_) => { + log::info!("sent colors: #{: >15}", display_id); } - } else { - for i in (group.pos-display_led_offset..group_size + group.pos-display_led_offset).rev() { - let bytes = colors[i].as_bytes(); - buffer.append(&mut bytes.to_vec()); - } - } - match Self::send_colors((group.start.min(group.end)) as u16, buffer).await { - Ok(_) => {} Err(err) => { - warn!("Failed to send colors: {}", err); + warn!("Failed to send colors: #{: >15}\t{}", display_id, err); } - }; - } - - log::info!("sent colors: #{: >15} {:?}", display_id, colors.len()); + } + }); match display_colors_tx.send(( display_id, - colors + colors_copy .into_iter() .map(|color| color.get_rgb()) .flatten() @@ -292,27 +264,6 @@ impl LedColorsPublisher { ); } }); - - // let rx = self.sorted_colors_rx.clone(); - // tokio::spawn(async move { - // let mut rx = rx.read().await.clone(); - // loop { - // if let Err(err) = rx.changed().await { - // warn!("rx changed error: {}", err); - // sleep(Duration::from_millis(1000)).await; - // continue; - // } - - // let colors = rx.borrow().clone(); - - // match Self::send_colors(colors).await { - // Ok(_) => {} - // Err(err) => { - // warn!("colors send failed: {}", err); - // } - // } - // } - // }); } pub async fn send_colors(offset: u16, mut payload: Vec) -> anyhow::Result<()> { @@ -329,6 +280,65 @@ impl LedColorsPublisher { Ok(()) } + pub async fn send_colors_by_display( + colors: Vec, + mappers: Vec, + ) -> anyhow::Result<()> { + // let color_len = colors.len(); + let display_led_offset = mappers + .clone() + .iter() + .flat_map(|mapper| [mapper.start, mapper.end]) + .min() + .unwrap(); + + let socket = UdpSocket::bind("0.0.0.0:0").await?; + for group in mappers.clone() { + if (group.start.abs_diff(group.end)) > colors.len() { + return Err(anyhow::anyhow!( + "get_sorted_colors: color_index out of range. color_index: {}, strip len: {}, colors.len(): {}", + group.pos, + group.start.abs_diff(group.end), + colors.len() + )); + } + + let group_size = group.start.abs_diff(group.end); + let mut buffer = Vec::::with_capacity(group_size * 3); + + if group.end > group.start { + for i in group.pos - display_led_offset..group_size + group.pos - display_led_offset + { + let bytes = colors[i].as_bytes(); + buffer.append(&mut bytes.to_vec()); + } + } else { + for i in (group.pos - display_led_offset + ..group_size + group.pos - display_led_offset) + .rev() + { + let bytes = colors[i].as_bytes(); + buffer.append(&mut bytes.to_vec()); + } + } + + let offset = group.start.min(group.end); + let mut tx_buffer = vec![2]; + tx_buffer.push((offset >> 8) as u8); + tx_buffer.push((offset & 0xff) as u8); + tx_buffer.append(&mut buffer); + socket.send_to(&tx_buffer, "192.168.31.206:23042").await?; + match Self::send_colors((group.start.min(group.end)) as u16, buffer).await { + Ok(_) => {} + Err(err) => { + warn!("Failed to send colors: {}", err); + } + }; + } + + Ok(()) + } + pub async fn clone_sorted_colors_receiver(&self) -> watch::Receiver> { self.sorted_colors_rx.read().await.clone() } diff --git a/src-tauri/src/led_color.rs b/src-tauri/src/led_color.rs index 539f4d7..87d49dc 100644 --- a/src-tauri/src/led_color.rs +++ b/src-tauri/src/led_color.rs @@ -1,5 +1,3 @@ -use std::ops::Index; - use color_space::{Hsv, Rgb}; use serde::Serialize; diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 3dd78b7..3d11300 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -11,6 +11,7 @@ mod screenshot_manager; use ambient_light::{Border, ColorCalibration, LedStripConfig, LedStripConfigGroup}; use display_info::DisplayInfo; use paris::{error, info, warn}; +use rpc::MqttRpc; use screenshot::Screenshot; use screenshot_manager::ScreenshotManager; use serde::{Deserialize, Serialize}; @@ -197,6 +198,8 @@ async fn main() { let led_color_publisher = ambient_light::LedColorsPublisher::global().await; led_color_publisher.start(); + let _mqtt = MqttRpc::global().await; + tauri::Builder::default() .invoke_handler(tauri::generate_handler![ greet, diff --git a/src-tauri/src/rpc/mqtt.rs b/src-tauri/src/rpc/mqtt.rs index 98e57b7..4bcd5ed 100644 --- a/src-tauri/src/rpc/mqtt.rs +++ b/src-tauri/src/rpc/mqtt.rs @@ -1,5 +1,5 @@ use paho_mqtt as mqtt; -use paris::{error, info, warn}; +use paris::{info, warn}; use serde_json::json; use std::time::Duration; use time::{format_description, OffsetDateTime}; @@ -9,8 +9,6 @@ use crate::ambient_light::{ColorCalibration, ConfigManager}; const DISPLAY_TOPIC: &'static str = "display-ambient-light/display"; const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop"; -const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/board/brightness"; -const BOARD_SEND_CMD: &'static str = "display-ambient-light/board/cmd"; const COLOR_CALIBRATION: &'static str = "display-ambient-light/desktop/color-calibration"; pub struct MqttRpc { @@ -43,7 +41,7 @@ impl MqttRpc { client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1); }); - client.set_connection_lost_callback(|client| { + client.set_connection_lost_callback(|_| { info!("MQTT server connection lost."); }); client.set_disconnected_callback(|_, a1, a2| { @@ -91,101 +89,12 @@ impl MqttRpc { Ok(Self { client }) } - pub async fn listen(&self) { - // let change_display_brightness_tx2 = self.change_display_brightness_tx.clone(); - // let message_tx_cloned = self.message_tx.clone(); - - // let mut stream = self.client.to_owned().get_stream(100); - - // while let Some(notification) = stream.next().await { - // match notification { - // Some(notification) => match notification.topic() { - // DISPLAY_BRIGHTNESS_TOPIC => { - // let payload_text = String::from_utf8(notification.payload().to_vec()); - // match payload_text { - // Ok(payload_text) => { - // let display_brightness: Result = - // serde_json::from_str(payload_text.as_str()); - // match display_brightness { - // Ok(display_brightness) => { - // match change_display_brightness_tx2.send(display_brightness) - // { - // Ok(_) => {} - // Err(err) => { - // warn!( - // "can not send display brightness to channel. {:?}", - // err - // ); - // } - // } - // } - // Err(err) => { - // warn!( - // "can not parse display brightness from payload. {:?}", - // err - // ); - // } - // } - // } - // Err(err) => { - // warn!("can not parse display brightness from payload. {:?}", err); - // } - // } - // } - // BOARD_SEND_CMD => { - // let payload_text = String::from_utf8(notification.payload().to_vec()); - // match payload_text { - // Ok(payload_text) => { - // let message: Result = - // serde_json::from_str(payload_text.as_str()); - // match message { - // Ok(message) => match message_tx_cloned.send(message) { - // Ok(_) => {} - // Err(err) => { - // warn!("can not send message to channel. {:?}", err); - // } - // }, - // Err(err) => { - // warn!("can not parse message from payload. {:?}", err); - // } - // } - // } - // Err(err) => { - // warn!("can not parse message from payload. {:?}", err); - // } - // } - // } - // _ => {} - // }, - // _ => { - // warn!("can not get notification from MQTT server."); - // } - // } - // } - } - pub async fn initialize(&self) -> anyhow::Result<()> { - // self.subscribe_board()?; - // self.subscribe_display()?; self.broadcast_desktop_online(); Self::publish_color_calibration_worker(); anyhow::Ok(()) } - fn subscribe_board(&self) -> anyhow::Result<()> { - self.client - .subscribe("display-ambient-light/board/#", mqtt::QOS_1) - .wait() - .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) - .map(|_| ()) - } - fn subscribe_display(&self) -> anyhow::Result<()> { - self.client - .subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1) - .wait() - .map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err)) - .map(|_| ()) - } fn publish_color_calibration_worker() { tokio::spawn(async move { let mqtt = Self::global().await; @@ -241,34 +150,7 @@ impl MqttRpc { }); } - pub async fn publish_led_sub_pixels(&self, payload: Vec) -> anyhow::Result<()> { - self.client - .publish(mqtt::Message::new( - "display-ambient-light/desktop/colors", - payload, - mqtt::QOS_1, - )) - .await - .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) - } - - // pub fn subscribe_change_display_brightness_rx( - // &self, - // ) -> broadcast::Receiver { - // self.change_display_brightness_tx.subscribe() - // } - pub async fn publish_desktop_cmd(&self, field: &str, payload: Vec) -> anyhow::Result<()> { - self.client - .publish(mqtt::Message::new( - format!("{}/{}", DESKTOP_TOPIC, field), - payload, - mqtt::QOS_1, - )) - .await - .map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error)) - } - - pub async fn publish_color_calibration(&self, payload: ColorCalibration) -> anyhow::Result<()> { + pub async fn publish_color_calibration(&self, payload: ColorCalibration) -> anyhow::Result<()> { self.client .publish(mqtt::Message::new( COLOR_CALIBRATION,