14 Commits

22 changed files with 1905 additions and 1360 deletions

View File

@@ -14,20 +14,20 @@
"@solidjs/router": "^0.8.2", "@solidjs/router": "^0.8.2",
"@tauri-apps/api": "^1.3.0", "@tauri-apps/api": "^1.3.0",
"debug": "^4.3.4", "debug": "^4.3.4",
"solid-icons": "^1.0.4", "solid-icons": "^1.0.8",
"solid-js": "^1.7.4", "solid-js": "^1.7.6",
"solid-tippy": "^0.2.1", "solid-tippy": "^0.2.1",
"tippy.js": "^6.3.7" "tippy.js": "^6.3.7"
}, },
"devDependencies": { "devDependencies": {
"@tauri-apps/cli": "^1.3.0", "@tauri-apps/cli": "^1.3.1",
"@types/debug": "^4.1.7", "@types/debug": "^4.1.8",
"@types/node": "^18.16.3", "@types/node": "^18.16.17",
"autoprefixer": "^10.4.14", "autoprefixer": "^10.4.14",
"postcss": "^8.4.23", "postcss": "^8.4.24",
"tailwindcss": "^3.3.2", "tailwindcss": "^3.3.2",
"typescript": "^4.9.5", "typescript": "^4.9.5",
"vite": "^4.3.4", "vite": "^4.3.9",
"vite-plugin-solid": "^2.7.0" "vite-plugin-solid": "^2.7.0"
} }
} }

750
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

719
src-tauri/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -36,6 +36,8 @@ tokio-stream = "0.1.14"
mdns-sd = "0.7.2" mdns-sd = "0.7.2"
futures = "0.3.28" futures = "0.3.28"
ddc-hi = "0.4.1" ddc-hi = "0.4.1"
coreaudio-rs = "0.11.2"
rust_swift_screencapture = { version = "0.1.1", path = "../../../../demo/rust-swift-screencapture" }
[features] [features]
# this feature is used for production builds or when `devPath` points to the filesystem # this feature is used for production builds or when `devPath` points to the filesystem

View File

@@ -1,7 +1,7 @@
use std::{borrow::BorrowMut, sync::Arc}; use std::{borrow::BorrowMut, sync::Arc};
use tauri::async_runtime::RwLock; use tauri::async_runtime::RwLock;
use tokio::sync::OnceCell; use tokio::{sync::OnceCell, task::yield_now};
use crate::ambient_light::{config, LedStripConfigGroup}; use crate::ambient_light::{config, LedStripConfigGroup};
@@ -9,7 +9,6 @@ use super::{Border, SamplePointMapper, ColorCalibration};
pub struct ConfigManager { pub struct ConfigManager {
config: Arc<RwLock<LedStripConfigGroup>>, config: Arc<RwLock<LedStripConfigGroup>>,
config_update_receiver: tokio::sync::watch::Receiver<LedStripConfigGroup>,
config_update_sender: tokio::sync::watch::Sender<LedStripConfigGroup>, config_update_sender: tokio::sync::watch::Sender<LedStripConfigGroup>,
} }
@@ -22,10 +21,12 @@ impl ConfigManager {
let (config_update_sender, config_update_receiver) = let (config_update_sender, config_update_receiver) =
tokio::sync::watch::channel(configs.clone()); tokio::sync::watch::channel(configs.clone());
config_update_sender.send(configs.clone()).unwrap(); if let Err(err) = config_update_sender.send(configs.clone()) {
log::error!("Failed to send config update when read config first time: {}", err);
}
drop(config_update_receiver);
ConfigManager { ConfigManager {
config: Arc::new(RwLock::new(configs)), config: Arc::new(RwLock::new(configs)),
config_update_receiver,
config_update_sender, config_update_sender,
} }
}) })
@@ -46,8 +47,9 @@ impl ConfigManager {
self.config_update_sender self.config_update_sender
.send(configs.clone()) .send(configs.clone())
.map_err(|e| anyhow::anyhow!("Failed to send config update: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to send config update: {}", e))?;
yield_now().await;
// log::info!("config updated: {:?}", configs); log::debug!("config updated: {:?}", configs);
Ok(()) Ok(())
} }
@@ -221,7 +223,7 @@ impl ConfigManager {
pub fn clone_config_update_receiver( pub fn clone_config_update_receiver(
&self, &self,
) -> tokio::sync::watch::Receiver<LedStripConfigGroup> { ) -> tokio::sync::watch::Receiver<LedStripConfigGroup> {
self.config_update_receiver.clone() self.config_update_sender.subscribe()
} }
pub async fn set_color_calibration(&self, color_calibration: ColorCalibration) -> anyhow::Result<()> { pub async fn set_color_calibration(&self, color_calibration: ColorCalibration) -> anyhow::Result<()> {

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{borrow::Borrow, collections::HashMap, sync::Arc, time::Duration};
use paris::warn; use paris::warn;
use tauri::async_runtime::RwLock; use tauri::async_runtime::RwLock;
@@ -11,8 +11,9 @@ use tokio::{
use crate::{ use crate::{
ambient_light::{config, ConfigManager}, ambient_light::{config, ConfigManager},
led_color::LedColor, led_color::LedColor,
screenshot::LedSamplePoints, rpc::UdpRpc,
screenshot_manager::{self, ScreenshotManager}, rpc::UdpRpc, screenshot::{self, LedSamplePoints},
screenshot_manager::{self, ScreenshotManager},
}; };
use itertools::Itertools; use itertools::Itertools;
@@ -48,37 +49,63 @@ impl LedColorsPublisher {
.await .await
} }
fn start_one_display_colors_fetcher( async fn start_one_display_colors_fetcher(
&self, &self,
display_id: u32, display_id: u32,
sample_points: Vec<Vec<LedSamplePoints>>, sample_points: Vec<LedSamplePoints>,
bound_scale_factor: f32, bound_scale_factor: f32,
mappers: Vec<SamplePointMapper>, mappers: Vec<SamplePointMapper>,
display_colors_tx: broadcast::Sender<(u32, Vec<u8>)>, display_colors_tx: broadcast::Sender<(u32, Vec<u8>)>,
) { ) {
let internal_tasks_version = self.inner_tasks_version.clone(); let internal_tasks_version = self.inner_tasks_version.clone();
let screenshot_manager = ScreenshotManager::global().await;
let screenshot_rx = screenshot_manager.subscribe_by_display_id(display_id).await;
if let Err(err) = screenshot_rx {
log::error!("{}", err);
return;
}
let mut screenshot_rx = screenshot_rx.unwrap();
tokio::spawn(async move { tokio::spawn(async move {
let colors = screenshot_manager::get_display_colors(
display_id,
&sample_points,
bound_scale_factor,
);
if let Err(err) = colors {
warn!("Failed to get colors: {}", err);
return;
}
let mut interval = tokio::time::interval(Duration::from_millis(33));
let init_version = internal_tasks_version.read().await.clone(); let init_version = internal_tasks_version.read().await.clone();
loop { while screenshot_rx.changed().await.is_ok() {
interval.tick().await; let screenshot = screenshot_rx.borrow().clone();
tokio::time::sleep(Duration::from_millis(1)).await; let colors = screenshot.get_colors_by_sample_points(&sample_points).await;
let colors_copy = colors.clone();
let mappers = mappers.clone();
match Self::send_colors_by_display(colors, mappers).await {
Ok(_) => {
// log::info!("sent colors: #{: >15}", display_id);
}
Err(err) => {
warn!("Failed to send colors: #{: >15}\t{}", display_id, err);
}
}
// match display_colors_tx.send((
// display_id,
// colors_copy
// .into_iter()
// .map(|color| color.get_rgb())
// .flatten()
// .collect::<Vec<_>>(),
// )) {
// Ok(_) => {
// // log::info!("sent colors: {:?}", color_len);
// }
// Err(err) => {
// warn!("Failed to send display_colors: {}", err);
// }
// };
// Check if the inner task version changed
let version = internal_tasks_version.read().await.clone(); let version = internal_tasks_version.read().await.clone();
if version != init_version { if version != init_version {
log::info!( log::info!(
"inner task version changed, stop. {} != {}", "inner task version changed, stop. {} != {}",
@@ -88,51 +115,6 @@ impl LedColorsPublisher {
break; break;
} }
let colors = screenshot_manager::get_display_colors(
display_id,
&sample_points,
bound_scale_factor,
);
if let Err(err) = colors {
warn!("Failed to get colors: {}", err);
sleep(Duration::from_millis(100)).await;
continue;
}
let colors: Vec<crate::led_color::LedColor> = colors.unwrap();
let colors_copy = colors.clone();
let mappers = mappers.clone();
tokio::spawn(async move {
match Self::send_colors_by_display(colors, mappers).await {
Ok(_) => {
// log::info!("sent colors: #{: >15}", display_id);
}
Err(err) => {
warn!("Failed to send colors: #{: >15}\t{}", display_id, err);
}
}
});
match display_colors_tx.send((
display_id,
colors_copy
.into_iter()
.map(|color| color.get_rgb())
.flatten()
.collect::<Vec<_>>(),
)) {
Ok(_) => {
// log::info!("sent colors: {:?}", color_len);
}
Err(err) => {
warn!("Failed to send display_colors: {}", err);
}
};
} }
}); });
} }
@@ -212,58 +194,61 @@ impl LedColorsPublisher {
}); });
} }
pub fn start(&self) { pub async fn start(&self) {
log::info!("start colors worker");
let config_manager = ConfigManager::global().await;
let mut config_receiver = config_manager.clone_config_update_receiver();
let configs = config_receiver.borrow().clone();
self.handle_config_change(configs).await;
log::info!("waiting for config update...");
while config_receiver.changed().await.is_ok() {
log::info!("config updated, restart inner tasks...");
let configs = config_receiver.borrow().clone();
self.handle_config_change(configs).await;
}
}
async fn handle_config_change(&self, configs: LedStripConfigGroup) {
let inner_tasks_version = self.inner_tasks_version.clone(); let inner_tasks_version = self.inner_tasks_version.clone();
let configs = Self::get_colors_configs(&configs).await;
tokio::spawn(async move { if let Err(err) = configs {
let publisher = Self::global().await; warn!("Failed to get configs: {}", err);
sleep(Duration::from_millis(100)).await;
return;
}
let config_manager = ConfigManager::global().await; let configs = configs.unwrap();
let mut config_receiver = config_manager.clone_config_update_receiver();
log::info!("waiting for config update..."); let mut inner_tasks_version = inner_tasks_version.write().await;
*inner_tasks_version = inner_tasks_version.overflowing_add(1).0;
drop(inner_tasks_version);
while config_receiver.changed().await.is_ok() { let (display_colors_tx, display_colors_rx) = broadcast::channel::<(u32, Vec<u8>)>(8);
log::info!("config updated, restart inner tasks...");
let configs = config_receiver.borrow().clone();
let configs = Self::get_colors_configs(&configs).await;
if let Err(err) = configs { for sample_point_group in configs.sample_point_groups.clone() {
warn!("Failed to get configs: {}", err); let display_id = sample_point_group.display_id;
sleep(Duration::from_millis(100)).await; let sample_points = sample_point_group.points;
continue; let bound_scale_factor = sample_point_group.bound_scale_factor;
} self.start_one_display_colors_fetcher(
display_id,
sample_points,
bound_scale_factor,
sample_point_group.mappers,
display_colors_tx.clone(),
)
.await;
}
let configs = configs.unwrap(); let display_ids = configs.sample_point_groups;
self.start_all_colors_worker(
let mut inner_tasks_version = inner_tasks_version.write().await; display_ids.iter().map(|c| c.display_id).collect(),
*inner_tasks_version = inner_tasks_version.overflowing_add(1).0; configs.mappers,
drop(inner_tasks_version); display_colors_rx,
);
let (display_colors_tx, display_colors_rx) =
broadcast::channel::<(u32, Vec<u8>)>(8);
for sample_point_group in configs.sample_point_groups.clone() {
let display_id = sample_point_group.display_id;
let sample_points = sample_point_group.points;
let bound_scale_factor = sample_point_group.bound_scale_factor;
publisher.start_one_display_colors_fetcher(
display_id,
sample_points,
bound_scale_factor,
sample_point_group.mappers,
display_colors_tx.clone(),
);
}
let display_ids = configs.sample_point_groups;
publisher.start_all_colors_worker(
display_ids.iter().map(|c| c.display_id).collect(),
configs.mappers,
display_colors_rx,
);
}
});
} }
pub async fn send_colors(offset: u16, mut payload: Vec<u8>) -> anyhow::Result<()> { pub async fn send_colors(offset: u16, mut payload: Vec<u8>) -> anyhow::Result<()> {
@@ -402,6 +387,7 @@ impl LedColorsPublisher {
let points: Vec<_> = led_strip_configs let points: Vec<_> = led_strip_configs
.clone() .clone()
.map(|(_, config)| screenshot.get_sample_points(&config)) .map(|(_, config)| screenshot.get_sample_points(&config))
.flatten()
.collect(); .collect();
if points.len() == 0 { if points.len() == 0 {
@@ -451,7 +437,7 @@ pub struct AllColorConfig {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DisplaySamplePointGroup { pub struct DisplaySamplePointGroup {
pub display_id: u32, pub display_id: u32,
pub points: Vec<Vec<LedSamplePoints>>, pub points: Vec<LedSamplePoints>,
pub bound_scale_factor: f32, pub bound_scale_factor: f32,
pub mappers: Vec<config::SamplePointMapper>, pub mappers: Vec<config::SamplePointMapper>,
} }

View File

@@ -0,0 +1,96 @@
use std::{sync::Arc, time::SystemTime};
use ddc_hi::{Ddc, Display};
use tokio::sync::RwLock;
use super::DisplayState;
pub struct DisplayHandler {
pub state: Arc<RwLock<DisplayState>>,
pub controller: Arc<RwLock<Display>>,
}
impl DisplayHandler {
pub async fn fetch_state(&self) {
let mut controller = self.controller.write().await;
let mut temp_state = self.state.read().await.clone();
match controller.handle.get_vcp_feature(0x10) {
Ok(value) => {
temp_state.max_brightness = value.maximum();
temp_state.min_brightness = 0;
temp_state.brightness = value.value();
}
Err(_) => {}
};
match controller.handle.get_vcp_feature(0x12) {
Ok(value) => {
temp_state.max_contrast = value.maximum();
temp_state.min_contrast = 0;
temp_state.contrast = value.value();
}
Err(_) => {}
};
match controller.handle.get_vcp_feature(0xdc) {
Ok(value) => {
temp_state.max_mode = value.maximum();
temp_state.min_mode = 0;
temp_state.mode = value.value();
}
Err(_) => {}
};
temp_state.last_fetched_at = SystemTime::now();
let mut state = self.state.write().await;
*state = temp_state;
}
pub async fn set_brightness(&self, brightness: u16) -> anyhow::Result<()> {
let mut controller = self.controller.write().await;
let mut state = self.state.write().await;
controller
.handle
.set_vcp_feature(0x10, brightness)
.map_err(|err| anyhow::anyhow!("can not set brightness. {:?}", err))?;
state.brightness = brightness;
state.last_modified_at = SystemTime::now();
Ok(())
}
pub async fn set_contrast(&self, contrast: u16) -> anyhow::Result<()> {
let mut controller = self.controller.write().await;
let mut state = self.state.write().await;
controller
.handle
.set_vcp_feature(0x12, contrast)
.map_err(|err| anyhow::anyhow!("can not set contrast. {:?}", err))?;
state.contrast = contrast;
state.last_modified_at = SystemTime::now();
Ok(())
}
pub async fn set_mode(&self, mode: u16) -> anyhow::Result<()> {
let mut controller = self.controller.write().await;
let mut state = self.state.write().await;
controller
.handle
.set_vcp_feature(0xdc, mode)
.map_err(|err| anyhow::anyhow!("can not set mode. {:?}", err))?;
state.mode = mode;
state.last_modified_at = SystemTime::now();
Ok(())
}
}

View File

@@ -14,6 +14,7 @@ pub struct DisplayState {
pub max_mode: u16, pub max_mode: u16,
pub min_mode: u16, pub min_mode: u16,
pub last_modified_at: SystemTime, pub last_modified_at: SystemTime,
pub last_fetched_at: SystemTime,
} }
impl DisplayState { impl DisplayState {
@@ -22,13 +23,26 @@ impl DisplayState {
brightness: 30, brightness: 30,
contrast: 50, contrast: 50,
mode: 0, mode: 0,
last_modified_at: SystemTime::now(), last_modified_at: SystemTime::UNIX_EPOCH,
max_brightness: 100, max_brightness: 100,
min_brightness: 0, min_brightness: 0,
max_contrast: 100, max_contrast: 100,
min_contrast: 0, min_contrast: 0,
max_mode: 15, max_mode: 15,
min_mode: 0, min_mode: 0,
last_fetched_at: SystemTime::UNIX_EPOCH,
} }
} }
} }
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct DisplayStateWrapper {
pub version: u8,
pub states: Vec<DisplayState>,
}
impl DisplayStateWrapper {
pub fn new(states: Vec<DisplayState>) -> Self {
Self { version: 1, states }
}
}

View File

@@ -1,25 +1,27 @@
use std::{ use std::{env::current_dir, sync::Arc, time::Duration};
borrow::Borrow,
collections::HashMap,
ops::Sub,
sync::Arc,
time::{Duration, SystemTime},
};
use ddc_hi::Display; use ddc_hi::Display;
use paris::{error, info, warn}; use paris::{error, info, warn};
use tokio::sync::{OnceCell, OwnedMutexGuard, RwLock}; use tauri::api::path::config_dir;
use tokio::{
sync::{broadcast, watch, OnceCell, RwLock},
task::yield_now,
};
use super::display_state::DisplayState; use crate::{
use ddc_hi::Ddc; display::DisplayStateWrapper,
rpc::{BoardMessageChannels, DisplaySetting},
};
pub struct DisplayHandler { use super::{display_handler::DisplayHandler, display_state::DisplayState};
pub state: Arc<RwLock<DisplayState>>,
pub controller: Arc<RwLock<Display>>, const CONFIG_FILE_NAME: &str = "cc.ivanli.ambient_light/displays.toml";
}
pub struct DisplayManager { pub struct DisplayManager {
displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>, displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>,
setting_request_handler: Option<tokio::task::JoinHandle<()>>,
displays_changed_sender: Arc<watch::Sender<Vec<DisplayState>>>,
auto_save_state_handler: Option<tokio::task::JoinHandle<()>>,
} }
impl DisplayManager { impl DisplayManager {
@@ -30,13 +32,52 @@ impl DisplayManager {
} }
pub async fn create() -> Self { pub async fn create() -> Self {
let instance = Self { let (displays_changed_sender, _) = watch::channel(Vec::new());
let displays_changed_sender = Arc::new(displays_changed_sender);
let mut instance = Self {
displays: Arc::new(RwLock::new(Vec::new())), displays: Arc::new(RwLock::new(Vec::new())),
setting_request_handler: None,
displays_changed_sender,
auto_save_state_handler: None,
}; };
instance.fetch_displays().await; instance.fetch_displays().await;
instance.restore_states().await;
instance.fetch_state_of_displays().await;
instance.subscribe_setting_request();
instance.auto_save_state_of_displays();
instance instance
} }
fn auto_save_state_of_displays(&mut self) {
let displays = self.displays.clone();
let handler = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
Self::save_states(displays.clone()).await;
Self::send_displays_changed(displays.clone()).await;
}
});
self.auto_save_state_handler = Some(handler);
}
async fn send_displays_changed(displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>) {
let mut states = Vec::new();
for display in displays.read().await.iter() {
let state = display.read().await.state.read().await.clone();
states.push(state);
}
let channel = BoardMessageChannels::global().await;
let tx = channel.displays_changed_sender.clone();
if let Err(err) = tx.send(states) {
error!("Failed to send displays changed: {}", err);
}
}
async fn fetch_displays(&self) { async fn fetch_displays(&self) {
let mut displays = self.displays.write().await; let mut displays = self.displays.write().await;
displays.clear(); displays.clear();
@@ -46,7 +87,21 @@ impl DisplayManager {
for display in controllers { for display in controllers {
let controller = Arc::new(RwLock::new(display)); let controller = Arc::new(RwLock::new(display));
let state = Arc::new(RwLock::new(DisplayState::default())); let state = Arc::new(RwLock::new(DisplayState::default()));
displays.push(Arc::new(RwLock::new(DisplayHandler { controller, state }))); let handler = DisplayHandler {
state: state.clone(),
controller: controller.clone(),
};
displays.push(Arc::new(RwLock::new(handler)));
}
}
async fn fetch_state_of_displays(&self) {
let displays = self.displays.read().await;
for display in displays.iter() {
let display = display.read().await;
display.fetch_state().await;
} }
} }
@@ -60,151 +115,168 @@ impl DisplayManager {
states states
} }
// pub async fn subscribe_display_brightness(&self) { fn subscribe_setting_request(&mut self) {
// let rpc = rpc::Manager::global().await; let displays = self.displays.clone();
let displays_changed_sender = self.displays_changed_sender.clone();
let handler = tokio::spawn(async move {
let channels = BoardMessageChannels::global().await;
let mut request_rx = channels.display_setting_request_sender.subscribe();
// let mut rx = rpc.client().subscribe_change_display_brightness_rx(); loop {
if let Err(err) = request_rx.recv().await {
match err {
broadcast::error::RecvError::Closed => {
info!("display setting request channel closed");
break;
}
broadcast::error::RecvError::Lagged(_) => {
warn!("display setting request channel lagged");
continue;
}
}
}
// loop { let message = request_rx.recv().await.unwrap();
// if let Ok(display_brightness) = rx.recv().await {
// if let Err(err) = self.set_display_brightness(display_brightness).await {
// error!("set_display_brightness failed. {:?}", err);
// }
// }
// }
// }
// fn read_display_config_by_ddc(index: usize) -> anyhow::Result<DisplayState> { let displays = displays.write().await;
// let mut displays = Display::enumerate();
// match displays.get_mut(index) {
// Some(display) => {
// let mut config = DisplayState::default(index);
// match display.handle.get_vcp_feature(0x10) {
// Ok(value) => {
// config.max_brightness = value.maximum();
// config.min_brightness = 0;
// config.brightness = value.value();
// }
// Err(_) => {}
// };
// match display.handle.get_vcp_feature(0x12) {
// Ok(value) => {
// config.max_contrast = value.maximum();
// config.min_contrast = 0;
// config.contrast = value.value();
// }
// Err(_) => {}
// };
// match display.handle.get_vcp_feature(0xdc) {
// Ok(value) => {
// config.max_mode = value.maximum();
// config.min_mode = 0;
// config.mode = value.value();
// }
// Err(_) => {}
// };
// Ok(config) let display = displays.get(message.display_index);
// } if display.is_none() {
// None => anyhow::bail!("display#{} is missed.", index), warn!("display#{} not found", message.display_index);
// } continue;
// } }
// async fn get_display(&self, index: usize) -> anyhow::Result<OwnedMutexGuard<DisplayState>> { let display = display.unwrap().write().await;
// let mut displays = self.displays.lock().await; let result = match message.setting {
// match displays.get_mut(&index) { DisplaySetting::Brightness(value) => display.set_brightness(value as u16).await,
// Some(config) => { DisplaySetting::Contrast(value) => display.set_contrast(value as u16).await,
// let mut config = config.to_owned().lock_owned().await; DisplaySetting::Mode(value) => display.set_mode(value as u16).await,
// if config.last_modified_at > SystemTime::now().sub(Duration::from_secs(10)) { };
// info!("cached");
// return Ok(config);
// }
// return match Self::read_display_config_by_ddc(index) {
// Ok(config) => {
// let id = config.id;
// let value = Arc::new(Mutex::new(config));
// let valueGuard = value.clone().lock_owned().await;
// displays.insert(id, value);
// info!("read form ddc");
// Ok(valueGuard)
// }
// Err(err) => {
// warn!(
// "can not read config from display by ddc, use CACHED value. {:?}",
// err
// );
// config.last_modified_at = SystemTime::now();
// Ok(config)
// }
// };
// }
// None => {
// let config = Self::read_display_config_by_ddc(index).map_err(|err| {
// anyhow::anyhow!(
// "can not read config from display by ddc,use DEFAULT value. {:?}",
// err
// )
// })?;
// let id = config.id;
// let value = Arc::new(Mutex::new(config));
// let valueGuard = value.clone().lock_owned().await;
// displays.insert(id, value);
// Ok(valueGuard)
// }
// }
// }
// pub async fn set_display_brightness( if let Err(err) = result {
// &self, error!("failed to set display setting: {}", err);
// display_brightness: DisplayBrightness, continue;
// ) -> anyhow::Result<()> { }
// match Display::enumerate().get_mut(display_brightness.display_index) {
// Some(display) => {
// match self.get_display(display_brightness.display_index).await {
// Ok(mut config) => {
// let curr = config.brightness;
// info!("curr_brightness: {:?}", curr);
// let mut target = match display_brightness.brightness {
// Brightness::Relative(v) => curr.wrapping_add_signed(v),
// Brightness::Absolute(v) => v,
// };
// if target.gt(&config.max_brightness) {
// target = config.max_brightness;
// } else if target.lt(&config.min_brightness) {
// target = config.min_brightness;
// }
// config.brightness = target;
// display
// .handle
// .set_vcp_feature(0x10, target as u16)
// .map_err(|err| anyhow::anyhow!("can not set brightness. {:?}", err))?;
// let rpc = rpc::Manager::global().await; drop(display);
// rpc.publish_desktop_cmd( let mut states = Vec::new();
// format!("display{}/brightness", display_brightness.display_index) for display in displays.iter() {
// .as_str(), let state = display.read().await.state.read().await.clone();
// target.to_be_bytes().to_vec(), states.push(state);
// ) }
// .await;
// } if let Err(err) = displays_changed_sender.send(states) {
// Err(err) => { error!("failed to send displays changed event: {}", err);
// info!( }
// "can not get display#{} brightness. {:?}", yield_now().await;
// display_brightness.display_index, err }
// ); });
// if let Brightness::Absolute(v) = display_brightness.brightness {
// display.handle.set_vcp_feature(0x10, v).map_err(|err| { self.setting_request_handler = Some(handler);
// anyhow::anyhow!("can not set brightness. {:?}", err) }
// })?;
// }; async fn restore_states(&self) {
// } let path = config_dir()
// }; .unwrap_or(current_dir().unwrap())
// } .join(CONFIG_FILE_NAME);
// None => {
// warn!("display#{} is not found.", display_brightness.display_index); if !path.exists() {
// } log::info!("config file not found: {}. skip read.", path.display());
// } return;
// Ok(()) }
// }
let text = std::fs::read_to_string(path);
if let Err(err) = text {
log::error!("failed to read config file: {}", err);
return;
}
let text = text.unwrap();
let wrapper = toml::from_str::<DisplayStateWrapper>(&text);
if let Err(err) = wrapper {
log::error!("failed to parse display states file: {}", err);
return;
}
let states = wrapper.unwrap().states;
let displays = self.displays.read().await;
for (index, display) in displays.iter().enumerate() {
let display = display.read().await;
let mut state = display.state.write().await;
let saved = states.get(index);
if let Some(saved) = saved {
state.brightness = saved.brightness;
state.contrast = saved.contrast;
state.mode = saved.mode;
log::info!("restore display config. display#{}: {:?}", index, state);
}
}
log::info!(
"restore display config. store displays: {}, online displays: {}",
states.len(),
displays.len()
);
}
async fn save_states(displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>) {
let path = config_dir()
.unwrap_or(current_dir().unwrap())
.join(CONFIG_FILE_NAME);
let displays = displays.read().await;
let mut states = Vec::new();
for display in displays.iter() {
let state = display.read().await.state.read().await.clone();
states.push(state);
}
let wrapper = DisplayStateWrapper::new(states);
let text = toml::to_string(&wrapper);
if let Err(err) = text {
log::error!("failed to serialize display states: {}", err);
log::error!("display states: {:?}", &wrapper);
return;
}
let text = text.unwrap();
if path.exists() {
if let Err(err) = std::fs::remove_file(&path) {
log::error!("failed to remove old config file: {}", err);
return;
}
}
if let Err(err) = std::fs::write(&path, text) {
log::error!("failed to write config file: {}", err);
return;
}
log::debug!(
"save display config. store displays: {}, online displays: {}",
wrapper.states.len(),
displays.len()
);
}
pub fn subscribe_displays_changed(&self) -> watch::Receiver<Vec<DisplayState>> {
self.displays_changed_sender.subscribe()
}
}
impl Drop for DisplayManager {
fn drop(&mut self) {
log::info!("dropping display manager=============");
if let Some(handler) = self.setting_request_handler.take() {
handler.abort();
}
if let Some(handler) = self.auto_save_state_handler.take() {
handler.abort();
}
}
} }

View File

@@ -2,6 +2,7 @@
// mod manager; // mod manager;
mod display_state; mod display_state;
mod manager; mod manager;
mod display_handler;
pub use display_state::*; pub use display_state::*;

View File

@@ -5,19 +5,21 @@ mod ambient_light;
mod display; mod display;
mod led_color; mod led_color;
mod rpc; mod rpc;
pub mod screenshot; mod screenshot;
mod screenshot_manager; mod screenshot_manager;
mod volume;
use ambient_light::{Border, ColorCalibration, LedStripConfig, LedStripConfigGroup}; use ambient_light::{Border, ColorCalibration, LedStripConfig, LedStripConfigGroup};
use display::{DisplayManager, DisplayState}; use display::{DisplayManager, DisplayState};
use display_info::DisplayInfo; use display_info::DisplayInfo;
use paris::{error, info, warn}; use paris::{error, info, warn};
use rpc::{BoardInfo, MqttRpc, UdpRpc}; use rpc::{BoardInfo, UdpRpc};
use screenshot::Screenshot; use screenshot::Screenshot;
use screenshot_manager::ScreenshotManager; use screenshot_manager::ScreenshotManager;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::to_string; use serde_json::to_string;
use tauri::{http::ResponseBuilder, regex, Manager}; use tauri::{http::ResponseBuilder, regex, Manager};
use volume::VolumeManager;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(remote = "DisplayInfo")] #[serde(remote = "DisplayInfo")]
@@ -86,7 +88,7 @@ async fn get_led_strips_sample_points(
let screenshot_manager = ScreenshotManager::global().await; let screenshot_manager = ScreenshotManager::global().await;
let channels = screenshot_manager.channels.read().await; let channels = screenshot_manager.channels.read().await;
if let Some(rx) = channels.get(&config.display_id) { if let Some(rx) = channels.get(&config.display_id) {
let rx = rx.clone(); let rx = rx.read().await;
let screenshot = rx.borrow().clone(); let screenshot = rx.borrow().clone();
let sample_points = screenshot.get_sample_points(&config); let sample_points = screenshot.get_sample_points(&config);
Ok(sample_points) Ok(sample_points)
@@ -103,7 +105,7 @@ async fn get_one_edge_colors(
let screenshot_manager = ScreenshotManager::global().await; let screenshot_manager = ScreenshotManager::global().await;
let channels = screenshot_manager.channels.read().await; let channels = screenshot_manager.channels.read().await;
if let Some(rx) = channels.get(&display_id) { if let Some(rx) = channels.get(&display_id) {
let rx = rx.clone(); let rx = rx.read().await;
let screenshot = rx.borrow().clone(); let screenshot = rx.borrow().clone();
let bytes = screenshot.bytes.read().await.to_owned(); let bytes = screenshot.bytes.read().await.to_owned();
let colors = let colors =
@@ -215,13 +217,19 @@ async fn get_displays() -> Vec<DisplayState> {
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
let screenshot_manager = ScreenshotManager::global().await; tokio::spawn(async move {
screenshot_manager.start().unwrap(); let screenshot_manager = ScreenshotManager::global().await;
screenshot_manager.start().await.unwrap_or_else(|e| {
error!("can not start screenshot manager: {}", e);
})
});
let led_color_publisher = ambient_light::LedColorsPublisher::global().await; tokio::spawn(async move {
led_color_publisher.start(); let led_color_publisher = ambient_light::LedColorsPublisher::global().await;
led_color_publisher.start().await;
});
let _mqtt = MqttRpc::global().await; let _volume = VolumeManager::global().await;
tauri::Builder::default() tauri::Builder::default()
.invoke_handler(tauri::generate_handler![ .invoke_handler(tauri::generate_handler![
@@ -280,77 +288,87 @@ async fn main() {
let bytes = tokio::task::block_in_place(move || { let bytes = tokio::task::block_in_place(move || {
tauri::async_runtime::block_on(async move { tauri::async_runtime::block_on(async move {
let screenshot_manager = ScreenshotManager::global().await; let screenshot_manager = ScreenshotManager::global().await;
let channels = screenshot_manager.channels.read().await; let rx: Result<tokio::sync::watch::Receiver<Screenshot>, anyhow::Error> =
if let Some(rx) = channels.get(&display_id) { screenshot_manager.subscribe_by_display_id(display_id).await;
let rx = rx.clone();
let screenshot = rx.borrow().clone();
let bytes = screenshot.bytes.read().await;
let (scale_factor_x, scale_factor_y, width, height) = if url.query.is_some() if let Err(err) = rx {
&& url.query.as_ref().unwrap().contains_key("height") anyhow::bail!("Display#{}: not found. {}", display_id, err);
&& url.query.as_ref().unwrap().contains_key("width") }
{ let mut rx = rx.unwrap();
let width = url.query.as_ref().unwrap()["width"]
.parse::<u32>() if rx.changed().await.is_err() {
.map_err(|err| { anyhow::bail!("Display#{}: no more screenshot.", display_id);
warn!("width parse error: {}", err); }
err let screenshot = rx.borrow().clone();
})?; let bytes = screenshot.bytes.read().await;
let height = url.query.as_ref().unwrap()["height"] if bytes.len() == 0 {
.parse::<u32>() anyhow::bail!("Display#{}: no screenshot.", display_id);
.map_err(|err| { }
warn!("height parse error: {}", err);
err log::debug!("Display#{}: screenshot size: {}", display_id, bytes.len());
})?;
( let (scale_factor_x, scale_factor_y, width, height) = if url.query.is_some()
screenshot.width as f32 / width as f32, && url.query.as_ref().unwrap().contains_key("height")
screenshot.height as f32 / height as f32, && url.query.as_ref().unwrap().contains_key("width")
width, {
height, let width = url.query.as_ref().unwrap()["width"]
) .parse::<u32>()
} else { .map_err(|err| {
log::debug!("scale by scale_factor"); warn!("width parse error: {}", err);
let scale_factor = screenshot.scale_factor; err
( })?;
scale_factor, let height = url.query.as_ref().unwrap()["height"]
scale_factor, .parse::<u32>()
(screenshot.width as f32 / scale_factor) as u32, .map_err(|err| {
(screenshot.height as f32 / scale_factor) as u32, warn!("height parse error: {}", err);
) err
}; })?;
log::debug!( (
"scale by query. width: {}, height: {}, scale_factor: {}, len: {}", screenshot.width as f32 / width as f32,
screenshot.height as f32 / height as f32,
width, width,
height, height,
screenshot.width as f32 / width as f32, )
width * height * 4,
);
let bytes_per_row = screenshot.bytes_per_row as f32;
let mut rgba_buffer = vec![0u8; (width * height * 4) as usize];
for y in 0..height {
for x in 0..width {
let offset = ((y as f32) * scale_factor_y).floor() as usize
* bytes_per_row as usize
+ ((x as f32) * scale_factor_x).floor() as usize * 4;
let b = bytes[offset];
let g = bytes[offset + 1];
let r = bytes[offset + 2];
let a = bytes[offset + 3];
let offset_2 = (y * width + x) as usize * 4;
rgba_buffer[offset_2] = r;
rgba_buffer[offset_2 + 1] = g;
rgba_buffer[offset_2 + 2] = b;
rgba_buffer[offset_2 + 3] = a;
}
}
Ok(rgba_buffer.clone())
} else { } else {
anyhow::bail!("Display#{}: not found", display_id); log::debug!("scale by scale_factor");
let scale_factor = screenshot.scale_factor;
(
scale_factor,
scale_factor,
(screenshot.width as f32 / scale_factor) as u32,
(screenshot.height as f32 / scale_factor) as u32,
)
};
log::debug!(
"scale by query. width: {}, height: {}, scale_factor: {}, len: {}",
width,
height,
screenshot.width as f32 / width as f32,
width * height * 4,
);
let bytes_per_row = screenshot.bytes_per_row as f32;
let mut rgba_buffer = vec![0u8; (width * height * 4) as usize];
for y in 0..height {
for x in 0..width {
let offset = ((y as f32) * scale_factor_y).floor() as usize
* bytes_per_row as usize
+ ((x as f32) * scale_factor_x).floor() as usize * 4;
let b = bytes[offset];
let g = bytes[offset + 1];
let r = bytes[offset + 2];
let a = bytes[offset + 3];
let offset_2 = (y * width + x) as usize * 4;
rgba_buffer[offset_2] = r;
rgba_buffer[offset_2 + 1] = g;
rgba_buffer[offset_2 + 2] = b;
rgba_buffer[offset_2 + 3] = a;
}
} }
Ok(rgba_buffer.clone())
}) })
}); });
@@ -371,8 +389,7 @@ async fn main() {
let app_handle = app.handle().clone(); let app_handle = app.handle().clone();
tokio::spawn(async move { tokio::spawn(async move {
let config_manager = ambient_light::ConfigManager::global().await; let config_manager = ambient_light::ConfigManager::global().await;
let config_update_receiver = config_manager.clone_config_update_receiver(); let mut config_update_receiver = config_manager.clone_config_update_receiver();
let mut config_update_receiver = config_update_receiver;
loop { loop {
if let Err(err) = config_update_receiver.changed().await { if let Err(err) = config_update_receiver.changed().await {
error!("config update receiver changed error: {}", err); error!("config update receiver changed error: {}", err);
@@ -450,6 +467,20 @@ async fn main() {
} }
}); });
let app_handle = app.handle().clone();
tokio::spawn(async move {
let display_manager = DisplayManager::global().await;
let mut rx = display_manager.subscribe_displays_changed();
while rx.changed().await.is_ok() {
let displays = rx.borrow().clone();
log::info!("displays changed. emit displays_changed event.");
app_handle.emit_all("displays_changed", displays).unwrap();
}
});
Ok(()) Ok(())
}) })
.run(tauri::generate_context!()) .run(tauri::generate_context!())

View File

@@ -1,15 +1,24 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use paris::{info, warn, error}; use paris::{error, info, warn};
use tokio::{net::UdpSocket, sync::RwLock, time::timeout, io}; use tokio::{io, net::UdpSocket, sync::RwLock, task::yield_now, time::timeout};
use super::{BoardConnectStatus, BoardInfo}; use crate::{
ambient_light::{ConfigManager, LedStripConfig},
rpc::DisplaySettingRequest,
volume::{self, VolumeManager},
};
use super::{BoardConnectStatus, BoardInfo, BoardMessageChannels};
#[derive(Debug)] #[derive(Debug)]
pub struct Board { pub struct Board {
pub info: Arc<RwLock<BoardInfo>>, pub info: Arc<RwLock<BoardInfo>>,
socket: Option<Arc<UdpSocket>>, socket: Option<Arc<UdpSocket>>,
listen_handler: Option<tokio::task::JoinHandle<()>>, listen_handler: Option<tokio::task::JoinHandle<()>>,
volume_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
state_of_displays_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
led_strip_config_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
} }
impl Board { impl Board {
@@ -18,31 +27,55 @@ impl Board {
info: Arc::new(RwLock::new(info)), info: Arc::new(RwLock::new(info)),
socket: None, socket: None,
listen_handler: None, listen_handler: None,
volume_changed_subscriber_handler: None,
state_of_displays_changed_subscriber_handler: None,
led_strip_config_changed_subscriber_handler: None,
} }
} }
pub async fn init_socket(&mut self) -> anyhow::Result<()> { pub async fn init_socket(&mut self) -> anyhow::Result<()> {
let info = self.info.read().await; let info = self.info.clone();
let info = info.read().await;
let socket = UdpSocket::bind("0.0.0.0:0").await?; let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.connect((info.address, info.port)).await?; socket.connect((info.address, info.port)).await?;
let socket = Arc::new(socket); let socket = Arc::new(socket);
self.socket = Some(socket.clone()); self.socket = Some(socket.clone());
let info = self.info.clone(); let handler = tokio::spawn(async move {
let handler=tokio::spawn(async move {
let mut buf = [0u8; 128]; let mut buf = [0u8; 128];
if let Err(err) = socket.readable().await {
error!("socket read error: {:?}", err); let board_message_channels = crate::rpc::channels::BoardMessageChannels::global().await;
return;
} let display_setting_request_sender = board_message_channels
.display_setting_request_sender
.clone();
let volume_setting_request_sender =
board_message_channels.volume_setting_request_sender.clone();
loop { loop {
match socket.try_recv(&mut buf) { match socket.recv(&mut buf).await {
Ok(len) => { Ok(len) => {
log::info!("recv: {:?}", &buf[..len]); log::info!("recv: {:?}", &buf[..len]);
if buf[0] == 3 {
let result =
display_setting_request_sender.send(DisplaySettingRequest {
display_index: buf[1] as usize,
setting: crate::rpc::DisplaySetting::Brightness(buf[2]),
});
if let Err(err) = result {
error!("send display setting request to channel failed: {:?}", err);
}
} else if buf[0] == 4 {
let result = volume_setting_request_sender.send(buf[1] as f32 / 100.0);
if let Err(err) = result {
error!("send volume setting request to channel failed: {:?}", err);
}
}
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
yield_now().await;
continue; continue;
} }
Err(e) => { Err(e) => {
@@ -54,9 +87,156 @@ impl Board {
}); });
self.listen_handler = Some(handler); self.listen_handler = Some(handler);
self.subscribe_volume_changed().await;
self.subscribe_state_of_displays_changed().await;
self.subscribe_led_strip_config_changed().await;
Ok(()) Ok(())
} }
async fn subscribe_volume_changed(&mut self) {
let channel = BoardMessageChannels::global().await;
let mut volume_changed_rx = channel.volume_changed_sender.subscribe();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
loop {
let volume: Result<f32, tokio::sync::broadcast::error::RecvError> =
volume_changed_rx.recv().await;
if let Err(err) = volume {
match err {
tokio::sync::broadcast::error::RecvError::Closed => {
log::error!("volume changed channel closed");
break;
}
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
log::info!("volume changed channel lagged");
continue;
}
}
}
let volume = volume.unwrap();
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send volume changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 2];
buf[0] = 4;
buf[1] = (volume * 100.0) as u8;
if let Err(err) = socket.send(&buf).await {
log::warn!("send volume changed failed: {:?}", err);
}
}
});
let volume_manager = VolumeManager::global().await;
let volume = volume_manager.get_volume().await;
if let Some(socket) = self.socket.as_ref() {
let buf = [4, (volume * 100.0) as u8];
if let Err(err) = socket.send(&buf).await {
log::warn!("send volume failed: {:?}", err);
}
} else {
log::warn!("socket is none, skip send volume");
}
self.volume_changed_subscriber_handler = Some(handler);
}
async fn subscribe_state_of_displays_changed(&mut self) {
let channel: &BoardMessageChannels = BoardMessageChannels::global().await;
let mut state_of_displays_changed_rx = channel.displays_changed_sender.subscribe();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
loop {
let states: Result<
Vec<crate::display::DisplayState>,
tokio::sync::broadcast::error::RecvError,
> = state_of_displays_changed_rx.recv().await;
if let Err(err) = states {
match err {
tokio::sync::broadcast::error::RecvError::Closed => {
log::error!("state of displays changed channel closed");
break;
}
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
log::info!("state of displays changed channel lagged");
continue;
}
}
}
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send state of displays changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 3];
let states = states.unwrap();
for (index, state) in states.iter().enumerate() {
buf[0] = 3;
buf[1] = index as u8;
buf[2] = state.brightness as u8;
log::info!("send state of displays changed: {:?}", &buf[..]);
if let Err(err) = socket.send(&buf).await {
log::warn!("send state of displays changed failed: {:?}", err);
}
}
}
});
self.state_of_displays_changed_subscriber_handler = Some(handler);
}
async fn subscribe_led_strip_config_changed(&mut self) {
let config_manager = ConfigManager::global().await;
let mut led_strip_config_changed_rx = config_manager.clone_config_update_receiver();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
while led_strip_config_changed_rx.changed().await.is_ok() {
let config = led_strip_config_changed_rx.borrow().clone();
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send led strip config changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 4];
buf[0] = 5;
buf[1..].copy_from_slice(&config.color_calibration.to_bytes());
log::info!("send led strip config changed: {:?}", &buf[..]);
if let Err(err) = socket.send(&buf).await {
log::warn!("send led strip config changed failed: {:?}", err);
}
}
});
self.led_strip_config_changed_subscriber_handler = Some(handler);
}
pub async fn send_colors(&self, buf: &[u8]) { pub async fn send_colors(&self, buf: &[u8]) {
let info = self.info.read().await; let info = self.info.read().await;
if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected { if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
@@ -125,15 +305,24 @@ impl Board {
} }
} }
impl Drop for Board { impl Drop for Board {
fn drop(&mut self) { fn drop(&mut self) {
info!("board drop");
if let Some(handler) = self.listen_handler.take() { if let Some(handler) = self.listen_handler.take() {
info!("aborting listen handler"); handler.abort();
tokio::task::block_in_place(move || { }
handler.abort();
}); if let Some(handler) = self.volume_changed_subscriber_handler.take() {
info!("listen handler aborted"); handler.abort();
}
if let Some(handler) = self.state_of_displays_changed_subscriber_handler.take() {
handler.abort();
}
if let Some(handler) = self.led_strip_config_changed_subscriber_handler.take() {
handler.abort();
} }
} }
} }

View File

@@ -0,0 +1,43 @@
use std::sync::Arc;
use tokio::sync::{broadcast, OnceCell};
use crate::display::DisplayState;
use super::DisplaySettingRequest;
pub struct BoardMessageChannels {
pub display_setting_request_sender: Arc<broadcast::Sender<DisplaySettingRequest>>,
pub volume_setting_request_sender: Arc<broadcast::Sender<f32>>,
pub volume_changed_sender: Arc<broadcast::Sender<f32>>,
pub displays_changed_sender: Arc<broadcast::Sender<Vec<DisplayState>>>,
}
impl BoardMessageChannels {
pub async fn global() -> &'static Self {
static BOARD_MESSAGE_CHANNELS: OnceCell<BoardMessageChannels> = OnceCell::const_new();
BOARD_MESSAGE_CHANNELS.get_or_init(|| async {Self::new()}).await
}
pub fn new() -> Self {
let (display_setting_request_sender, _) = broadcast::channel(16);
let display_setting_request_sender = Arc::new(display_setting_request_sender);
let (volume_setting_request_sender, _) = broadcast::channel(16);
let volume_setting_request_sender = Arc::new(volume_setting_request_sender);
let (volume_changed_sender, _) = broadcast::channel(2);
let volume_changed_sender = Arc::new(volume_changed_sender);
let (displays_changed_sender, _) = broadcast::channel(2);
let displays_changed_sender = Arc::new(displays_changed_sender);
Self {
display_setting_request_sender,
volume_setting_request_sender,
volume_changed_sender,
displays_changed_sender,
}
}
}

View File

@@ -0,0 +1,13 @@
#[derive(Clone, Debug)]
pub enum DisplaySetting {
Brightness(u8),
Contrast(u8),
Mode(u8),
}
#[derive(Clone, Debug)]
pub struct DisplaySettingRequest {
pub display_index: usize,
pub setting: DisplaySetting,
}

View File

@@ -1,9 +1,11 @@
mod board_info; mod board_info;
mod mqtt;
mod udp; mod udp;
mod board; mod board;
mod display_setting_request;
mod channels;
pub use board_info::*; pub use board_info::*;
pub use mqtt::*;
pub use udp::*; pub use udp::*;
pub use board::*; pub use board::*;
pub use display_setting_request::*;
pub use channels::*;

View File

@@ -1,163 +0,0 @@
use paho_mqtt as mqtt;
use paris::{info, warn};
use serde_json::json;
use std::time::Duration;
use time::{format_description, OffsetDateTime};
use tokio::{sync::OnceCell, task};
use crate::ambient_light::{ColorCalibration, ConfigManager};
const DISPLAY_TOPIC: &'static str = "display-ambient-light/display";
const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop";
const COLOR_CALIBRATION: &'static str = "display-ambient-light/desktop/color-calibration";
pub struct MqttRpc {
client: mqtt::AsyncClient,
// change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>,
// message_tx: broadcast::Sender<models::CmdMqMessage>,
}
impl MqttRpc {
pub async fn global() -> &'static Self {
static MQTT_RPC: OnceCell<MqttRpc> = OnceCell::const_new();
MQTT_RPC
.get_or_init(|| async {
let mqtt_rpc = MqttRpc::new().await.unwrap();
mqtt_rpc.initialize().await.unwrap();
mqtt_rpc
})
.await
}
pub async fn new() -> anyhow::Result<Self> {
let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883")
.map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?;
client.set_connected_callback(|client| {
info!("MQTT server connected.");
client.subscribe("display-ambient-light/board/#", mqtt::QOS_1);
client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1);
});
client.set_connection_lost_callback(|_| {
info!("MQTT server connection lost.");
});
client.set_disconnected_callback(|_, a1, a2| {
info!("MQTT server disconnected. {:?} {:?}", a1, a2);
});
let mut last_will_payload = serde_json::Map::new();
last_will_payload.insert("message".to_string(), json!("offline"));
last_will_payload.insert(
"time".to_string(),
serde_json::Value::String(
OffsetDateTime::now_utc()
.format(&time::format_description::well_known::iso8601::Iso8601::DEFAULT)
.unwrap()
.to_string(),
),
);
let last_will = mqtt::Message::new(
format!("{}/status", DESKTOP_TOPIC),
serde_json::to_string(&last_will_payload)
.unwrap()
.as_bytes(),
mqtt::QOS_1,
);
let connect_options = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(5))
.will_message(last_will)
.automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5))
.finalize();
let token = client.connect(connect_options);
token.await.map_err(|err| {
anyhow::anyhow!(
"can not connect MQTT server. wait for connect token failed. {:?}",
err
)
})?;
// let (change_display_brightness_tx, _) =
// broadcast::channel::<display::DisplayBrightness>(16);
// let (message_tx, _) = broadcast::channel::<models::CmdMqMessage>(32);
Ok(Self { client })
}
pub async fn initialize(&self) -> anyhow::Result<()> {
self.broadcast_desktop_online();
Self::publish_color_calibration_worker();
anyhow::Ok(())
}
fn publish_color_calibration_worker() {
tokio::spawn(async move {
let mqtt = Self::global().await;
let config_manager = ConfigManager::global().await;
let mut config_receiver = config_manager.clone_config_update_receiver();
let config = config_manager.configs().await;
if let Err(err) = mqtt
.publish_color_calibration(config.color_calibration)
.await
{
warn!("can not publish color calibration. {}", err);
}
while config_receiver.changed().await.is_ok() {
let config = config_receiver.borrow().clone();
if let Err(err) = mqtt
.publish_color_calibration(config.color_calibration)
.await
{
warn!("can not publish color calibration. {}", err);
}
}
});
}
fn broadcast_desktop_online(&self) {
let client = self.client.to_owned();
task::spawn(async move {
loop {
match OffsetDateTime::now_utc()
.format(&format_description::well_known::Iso8601::DEFAULT)
{
Ok(now_str) => {
let msg = mqtt::Message::new(
"display-ambient-light/desktop/online",
now_str.as_bytes(),
mqtt::QOS_0,
);
match client.publish(msg).await {
Ok(_) => {}
Err(error) => {
warn!("can not publish last online time. {}", error)
}
}
}
Err(error) => {
warn!("can not get time for now. {}", error);
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
});
}
pub async fn publish_color_calibration(&self, payload: ColorCalibration) -> anyhow::Result<()> {
self.client
.publish(mqtt::Message::new(
COLOR_CALIBRATION,
payload.to_bytes(),
mqtt::QOS_1,
))
.await
.map_err(|error| anyhow::anyhow!("mqtt publish color calibration failed. {}", error))
}
}

View File

@@ -1,12 +1,9 @@
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration}; use std::{collections::HashMap, sync::Arc, time::Duration};
use futures::future::join_all; use futures::future::join_all;
use mdns_sd::{ServiceDaemon, ServiceEvent}; use mdns_sd::{ServiceDaemon, ServiceEvent};
use paris::{error, info, warn}; use paris::{error, info, warn};
use tokio::{ use tokio::sync::{watch, OnceCell, RwLock};
net::UdpSocket,
sync::{watch, OnceCell, RwLock},
};
use super::{Board, BoardInfo}; use super::{Board, BoardInfo};
@@ -102,7 +99,9 @@ impl UdpRpc {
} }
if boards.insert(board_info.fullname.clone(), board).is_some() { if boards.insert(board_info.fullname.clone(), board).is_some() {
info!("added board {:?}", board_info); info!("replace board {:?}", board_info);
} else {
info!("add board {:?}", board_info);
} }
let tx_boards = boards let tx_boards = boards

View File

@@ -1,5 +1,6 @@
use std::iter; use std::fmt::Formatter;
use std::{iter, fmt::Debug};
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -7,17 +8,30 @@ use tauri::async_runtime::RwLock;
use crate::{ambient_light::LedStripConfig, led_color::LedColor}; use crate::{ambient_light::LedStripConfig, led_color::LedColor};
#[derive(Debug, Clone)] #[derive(Clone)]
pub struct Screenshot { pub struct Screenshot {
pub display_id: u32, pub display_id: u32,
pub height: u32, pub height: u32,
pub width: u32, pub width: u32,
pub bytes_per_row: usize, pub bytes_per_row: usize,
pub bytes: Arc<RwLock<Vec<u8>>>, pub bytes: Arc<RwLock<Arc<Vec<u8>>>>,
pub scale_factor: f32, pub scale_factor: f32,
pub bound_scale_factor: f32, pub bound_scale_factor: f32,
} }
impl Debug for Screenshot {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Screenshot")
.field("display_id", &self.display_id)
.field("height", &self.height)
.field("width", &self.width)
.field("bytes_per_row", &self.bytes_per_row)
.field("scale_factor", &self.scale_factor)
.field("bound_scale_factor", &self.bound_scale_factor)
.finish()
}
}
static SINGLE_AXIS_POINTS: usize = 5; static SINGLE_AXIS_POINTS: usize = 5;
impl Screenshot { impl Screenshot {
@@ -26,7 +40,7 @@ impl Screenshot {
height: u32, height: u32,
width: u32, width: u32,
bytes_per_row: usize, bytes_per_row: usize,
bytes: Vec<u8>, bytes: Arc<Vec<u8>>,
scale_factor: f32, scale_factor: f32,
bound_scale_factor: f32, bound_scale_factor: f32,
) -> Self { ) -> Self {

View File

@@ -1,51 +1,20 @@
use std::time::Duration;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use core_graphics::display::{ use core_graphics::display::{
kCGNullWindowID, kCGWindowImageDefault, kCGWindowListOptionOnScreenOnly, CGDisplay, kCGNullWindowID, kCGWindowImageDefault, kCGWindowListOptionOnScreenOnly, CGDisplay,
}; };
use core_graphics::geometry::{CGPoint, CGRect, CGSize}; use core_graphics::geometry::{CGPoint, CGRect, CGSize};
use paris::warn; use paris::{info, warn};
use rust_swift_screencapture::display::CGDisplayId;
use tauri::async_runtime::RwLock; use tauri::async_runtime::RwLock;
use tokio::sync::{broadcast, watch, OnceCell}; use tokio::sync::{broadcast, watch, Mutex, OnceCell};
use tokio::time::{self, Duration}; use tokio::task::yield_now;
use tokio::time::sleep;
use crate::screenshot::LedSamplePoints; use crate::screenshot::LedSamplePoints;
use crate::{ambient_light::SamplePointMapper, led_color::LedColor, screenshot::Screenshot}; use crate::{ambient_light::SamplePointMapper, led_color::LedColor, screenshot::Screenshot};
pub fn take_screenshot(display_id: u32, scale_factor: f32) -> anyhow::Result<Screenshot> {
log::debug!("take_screenshot");
let cg_display = CGDisplay::new(display_id);
let cg_image = CGDisplay::screenshot(
cg_display.bounds(),
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID,
kCGWindowImageDefault,
)
.ok_or_else(|| anyhow::anyhow!("Display#{}: take screenshot failed", display_id))?;
let buffer = cg_image.data();
let bytes_per_row = cg_image.bytes_per_row();
let height = cg_image.height();
let width = cg_image.width();
let bytes = buffer.bytes().to_owned();
let cg_display = CGDisplay::new(display_id);
let bound_scale_factor = (cg_display.bounds().size.width / width as f64) as f32;
Ok(Screenshot::new(
display_id,
height as u32,
width as u32,
bytes_per_row,
bytes,
scale_factor,
bound_scale_factor,
))
}
pub fn get_display_colors( pub fn get_display_colors(
display_id: u32, display_id: u32,
sample_points: &Vec<Vec<LedSamplePoints>>, sample_points: &Vec<Vec<LedSamplePoints>>,
@@ -114,7 +83,7 @@ pub fn get_display_colors(
} }
pub struct ScreenshotManager { pub struct ScreenshotManager {
pub channels: Arc<RwLock<HashMap<u32, watch::Receiver<Screenshot>>>>, pub channels: Arc<RwLock<HashMap<u32, Arc<RwLock<watch::Sender<Screenshot>>>>>>,
merged_screenshot_tx: Arc<RwLock<broadcast::Sender<Screenshot>>>, merged_screenshot_tx: Arc<RwLock<broadcast::Sender<Screenshot>>>,
} }
@@ -134,74 +103,77 @@ impl ScreenshotManager {
.await .await
} }
pub fn start(&self) -> anyhow::Result<()> { pub async fn start(&self) -> anyhow::Result<()> {
let displays = display_info::DisplayInfo::all()?; let displays = display_info::DisplayInfo::all()?;
for display in displays {
self.start_one(display.id, display.scale_factor)?;
}
Ok(())
}
fn start_one(&self, display_id: u32, scale_factor: f32) -> anyhow::Result<()> { let futures = displays.iter().map(|display| async {
let channels = self.channels.to_owned(); self.start_one(display.id, display.scale_factor)
let merged_screenshot_tx = self.merged_screenshot_tx.clone(); .await
tokio::spawn(async move { .unwrap_or_else(|err| {
let screenshot = take_screenshot(display_id, scale_factor); warn!("start_one failed: display_id: {}, err: {}", display.id, err);
});
if screenshot.is_err() { info!("start_one finished: display_id: {}", display.id);
warn!("take_screenshot_loop: {}", screenshot.err().unwrap());
return;
}
let mut interval = time::interval(Duration::from_millis(1000));
let screenshot = screenshot.unwrap();
let (screenshot_tx, screenshot_rx) = watch::channel(screenshot);
{
let channels = channels.clone();
let mut channels = channels.write().await;
channels.insert(display_id, screenshot_rx.clone());
}
let merged_screenshot_tx = merged_screenshot_tx.read().await.clone();
loop {
Self::take_screenshot_loop(
display_id,
scale_factor,
&screenshot_tx,
&merged_screenshot_tx,
)
.await;
interval.tick().await;
tokio::time::sleep(Duration::from_millis(1)).await;
}
}); });
futures::future::join_all(futures).await;
Ok(()) Ok(())
} }
async fn take_screenshot_loop( async fn start_one(&self, display_id: u32, scale_factor: f32) -> anyhow::Result<()> {
display_id: u32, let merged_screenshot_tx = self.merged_screenshot_tx.clone();
scale_factor: f32,
screenshot_tx: &watch::Sender<Screenshot>, let (tx, _) = watch::channel(Screenshot::new(
merged_screenshot_tx: &broadcast::Sender<Screenshot>, display_id,
) { 0,
let screenshot = take_screenshot(display_id, scale_factor); 0,
if let Ok(screenshot) = screenshot { 0,
match merged_screenshot_tx.send(screenshot.clone()) { Arc::new(vec![]),
Ok(_) => { scale_factor,
log::info!( scale_factor,
"take_screenshot_loop: merged_screenshot_tx.send success. display#{}", ));
display_id let tx = Arc::new(RwLock::new(tx));
);
let mut channels = self.channels.write().await;
channels.insert(display_id, tx.clone());
drop(channels);
loop {
let display = rust_swift_screencapture::display::Display::new(display_id);
let mut frame_rx = display.subscribe_frame().await;
display.start_capture(30).await;
let tx_for_send = tx.read().await;
while frame_rx.changed().await.is_ok() {
let frame = frame_rx.borrow().clone();
let screenshot = Screenshot::new(
display_id,
frame.height as u32,
frame.width as u32,
frame.bytes_per_row as usize,
frame.bytes,
scale_factor,
scale_factor,
);
let merged_screenshot_tx = merged_screenshot_tx.write().await;
if let Err(err) = merged_screenshot_tx.send(screenshot.clone()) {
// log::warn!("merged_screenshot_tx.send failed: {}", err);
} }
Err(_) => { if let Err(err) = tx_for_send.send(screenshot.clone()) {
log::warn!("display {} screenshot_tx.send failed: {}", display_id, err);
} else {
log::debug!("screenshot: {:?}", screenshot);
} }
yield_now().await;
} }
screenshot_tx.send(screenshot).unwrap(); sleep(Duration::from_secs(5)).await;
// log::info!("take_screenshot_loop: send success. display#{}", display_id) info!(
} else { "display {} frame_rx.changed() failed, try to restart",
warn!("take_screenshot_loop: {}", screenshot.err().unwrap()); display_id
);
} }
} }
@@ -257,4 +229,16 @@ impl ScreenshotManager {
pub async fn clone_merged_screenshot_rx(&self) -> broadcast::Receiver<Screenshot> { pub async fn clone_merged_screenshot_rx(&self) -> broadcast::Receiver<Screenshot> {
self.merged_screenshot_tx.read().await.subscribe() self.merged_screenshot_tx.read().await.subscribe()
} }
pub async fn subscribe_by_display_id(
&self,
display_id: CGDisplayId,
) -> anyhow::Result<watch::Receiver<Screenshot>> {
let channels = self.channels.read().await;
if let Some(tx) = channels.get(&display_id) {
Ok(tx.read().await.subscribe())
} else {
Err(anyhow::anyhow!("display_id: {} not found", display_id))
}
}
} }

View File

@@ -0,0 +1,203 @@
use std::{mem, sync::Arc};
use coreaudio::{
audio_unit::macos_helpers::get_default_device_id,
sys::{
kAudioHardwareServiceDeviceProperty_VirtualMasterVolume, kAudioObjectPropertyScopeOutput,
AudioObjectGetPropertyData, AudioObjectHasProperty, AudioObjectPropertyAddress,
AudioObjectSetPropertyData,
},
};
use paris::error;
use tokio::sync::{OnceCell, RwLock};
use crate::rpc::BoardMessageChannels;
pub struct VolumeManager {
current_volume: Arc<RwLock<f32>>,
handler: Option<tokio::task::JoinHandle<()>>,
read_handler: Option<tokio::task::JoinHandle<()>>,
}
impl VolumeManager {
pub async fn global() -> &'static Self {
static VOLUME_MANAGER: OnceCell<VolumeManager> = OnceCell::const_new();
VOLUME_MANAGER
.get_or_init(|| async { Self::create() })
.await
}
pub fn create() -> Self {
let mut instance = Self {
current_volume: Arc::new(RwLock::new(0.0)),
handler: None,
read_handler: None,
};
instance.subscribe_volume_setting_request();
instance.auto_read_volume();
instance
}
fn subscribe_volume_setting_request(&mut self) {
let handler = tokio::spawn(async {
let channels = BoardMessageChannels::global().await;
let mut request_rx = channels.volume_setting_request_sender.subscribe();
while let Ok(volume) = request_rx.recv().await {
if let Err(err) = Self::set_volume(volume) {
error!("failed to set volume: {}", err);
}
}
});
self.handler = Some(handler);
}
fn auto_read_volume(&mut self) {
let current_volume = self.current_volume.clone();
let handler = tokio::spawn(async move {
let channel = BoardMessageChannels::global().await;
let volume_changed_tx = channel.volume_changed_sender.clone();
loop {
match Self::read_volume() {
Ok(value) => {
let mut volume = current_volume.write().await;
if *volume != value {
if let Err(err) = volume_changed_tx.send(value) {
error!("failed to send volume changed event: {}", err);
}
}
*volume = value;
}
Err(err) => {
error!("failed to read volume: {}", err);
}
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});
self.read_handler = Some(handler);
}
fn set_volume(volume: f32) -> anyhow::Result<()> {
log::debug!("set volume: {}", volume);
let device_id = get_default_device_id(false);
if device_id.is_none() {
anyhow::bail!("default audio output device is not found.");
}
let device_id = device_id.unwrap();
let address = AudioObjectPropertyAddress {
mSelector: kAudioHardwareServiceDeviceProperty_VirtualMasterVolume,
mScope: kAudioObjectPropertyScopeOutput,
mElement: 0,
};
log::debug!("device id: {}", device_id);
log::debug!("address: {:?}", address);
if 0 == unsafe { AudioObjectHasProperty(device_id, &address) } {
anyhow::bail!("Can not get audio property");
}
let size = mem::size_of::<f32>() as u32;
let result = unsafe {
AudioObjectSetPropertyData(
device_id,
&address,
0,
std::ptr::null(),
size,
&volume as *const f32 as *const std::ffi::c_void,
)
};
if result != 0 {
anyhow::bail!("Can not set audio property");
}
Ok(())
}
fn read_volume() -> anyhow::Result<f32> {
let device_id = get_default_device_id(false);
if device_id.is_none() {
anyhow::bail!("default audio output device is not found.");
}
let device_id = device_id.unwrap();
let address = AudioObjectPropertyAddress {
mSelector: kAudioHardwareServiceDeviceProperty_VirtualMasterVolume,
mScope: kAudioObjectPropertyScopeOutput,
mElement: 0,
};
log::debug!("device id: {}", device_id);
log::debug!("address: {:?}", address);
if 0 == unsafe { AudioObjectHasProperty(device_id, &address) } {
anyhow::bail!("Can not get audio property");
}
let mut size = mem::size_of::<f32>() as u32;
let mut volume = 0.0f32;
let result = unsafe {
AudioObjectGetPropertyData(
device_id,
&address,
0,
std::ptr::null(),
&mut size,
&mut volume as *mut f32 as *mut std::ffi::c_void,
)
};
if result != 0 {
anyhow::bail!("Can not get audio property. result: {}", result);
}
if size != mem::size_of::<f32>() as u32 {
anyhow::bail!("Can not get audio property. data size is not matched.");
}
log::debug!("current system volume of primary device: {}", volume);
Ok(volume)
}
pub async fn get_volume(&self) -> f32 {
self.current_volume.read().await.clone()
}
}
impl Drop for VolumeManager {
fn drop(&mut self) {
log::info!("drop volume manager");
if let Some(handler) = self.handler.take() {
tokio::task::block_in_place(move || {
handler.abort();
});
}
if let Some(handler) = self.read_handler.take() {
tokio::task::block_in_place(move || {
handler.abort();
});
}
}
}

View File

@@ -0,0 +1,3 @@
mod manager;
pub use manager::*;

View File

@@ -28,7 +28,10 @@
"icons/icon.ico" "icons/icon.ico"
], ],
"identifier": "cc.ivanli.ambient-light.desktop", "identifier": "cc.ivanli.ambient-light.desktop",
"targets": "all" "targets": "all",
"macOS": {
"minimumSystemVersion": "13"
}
}, },
"security": { "security": {
"csp": null "csp": null