9 Commits

19 changed files with 1431 additions and 1207 deletions

View File

@ -14,20 +14,20 @@
"@solidjs/router": "^0.8.2",
"@tauri-apps/api": "^1.3.0",
"debug": "^4.3.4",
"solid-icons": "^1.0.4",
"solid-js": "^1.7.4",
"solid-icons": "^1.0.8",
"solid-js": "^1.7.6",
"solid-tippy": "^0.2.1",
"tippy.js": "^6.3.7"
},
"devDependencies": {
"@tauri-apps/cli": "^1.3.0",
"@types/debug": "^4.1.7",
"@types/node": "^18.16.3",
"@tauri-apps/cli": "^1.3.1",
"@types/debug": "^4.1.8",
"@types/node": "^18.16.17",
"autoprefixer": "^10.4.14",
"postcss": "^8.4.23",
"postcss": "^8.4.24",
"tailwindcss": "^3.3.2",
"typescript": "^4.9.5",
"vite": "^4.3.4",
"vite": "^4.3.9",
"vite-plugin-solid": "^2.7.0"
}
}

750
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

598
src-tauri/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -37,6 +37,7 @@ mdns-sd = "0.7.2"
futures = "0.3.28"
ddc-hi = "0.4.1"
coreaudio-rs = "0.11.2"
rust_swift_screencapture = { version = "0.1.1", path = "../../../../demo/rust-swift-screencapture" }
[features]
# this feature is used for production builds or when `devPath` points to the filesystem

View File

@ -1,7 +1,7 @@
use std::{borrow::BorrowMut, sync::Arc};
use tauri::async_runtime::RwLock;
use tokio::sync::OnceCell;
use tokio::{sync::OnceCell, task::yield_now};
use crate::ambient_light::{config, LedStripConfigGroup};
@ -9,7 +9,6 @@ use super::{Border, SamplePointMapper, ColorCalibration};
pub struct ConfigManager {
config: Arc<RwLock<LedStripConfigGroup>>,
config_update_receiver: tokio::sync::watch::Receiver<LedStripConfigGroup>,
config_update_sender: tokio::sync::watch::Sender<LedStripConfigGroup>,
}
@ -22,10 +21,12 @@ impl ConfigManager {
let (config_update_sender, config_update_receiver) =
tokio::sync::watch::channel(configs.clone());
config_update_sender.send(configs.clone()).unwrap();
if let Err(err) = config_update_sender.send(configs.clone()) {
log::error!("Failed to send config update when read config first time: {}", err);
}
drop(config_update_receiver);
ConfigManager {
config: Arc::new(RwLock::new(configs)),
config_update_receiver,
config_update_sender,
}
})
@ -46,8 +47,9 @@ impl ConfigManager {
self.config_update_sender
.send(configs.clone())
.map_err(|e| anyhow::anyhow!("Failed to send config update: {}", e))?;
yield_now().await;
// log::info!("config updated: {:?}", configs);
log::debug!("config updated: {:?}", configs);
Ok(())
}
@ -221,7 +223,7 @@ impl ConfigManager {
pub fn clone_config_update_receiver(
&self,
) -> tokio::sync::watch::Receiver<LedStripConfigGroup> {
self.config_update_receiver.clone()
self.config_update_sender.subscribe()
}
pub async fn set_color_calibration(&self, color_calibration: ColorCalibration) -> anyhow::Result<()> {

View File

@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{borrow::Borrow, collections::HashMap, sync::Arc, time::Duration};
use paris::warn;
use tauri::async_runtime::RwLock;
@ -11,8 +11,9 @@ use tokio::{
use crate::{
ambient_light::{config, ConfigManager},
led_color::LedColor,
screenshot::LedSamplePoints,
screenshot_manager::{self, ScreenshotManager}, rpc::UdpRpc,
rpc::UdpRpc,
screenshot::{self, LedSamplePoints},
screenshot_manager::{self, ScreenshotManager},
};
use itertools::Itertools;
@ -48,37 +49,63 @@ impl LedColorsPublisher {
.await
}
fn start_one_display_colors_fetcher(
async fn start_one_display_colors_fetcher(
&self,
display_id: u32,
sample_points: Vec<Vec<LedSamplePoints>>,
sample_points: Vec<LedSamplePoints>,
bound_scale_factor: f32,
mappers: Vec<SamplePointMapper>,
display_colors_tx: broadcast::Sender<(u32, Vec<u8>)>,
) {
let internal_tasks_version = self.inner_tasks_version.clone();
let screenshot_manager = ScreenshotManager::global().await;
let screenshot_rx = screenshot_manager.subscribe_by_display_id(display_id).await;
if let Err(err) = screenshot_rx {
log::error!("{}", err);
return;
}
let mut screenshot_rx = screenshot_rx.unwrap();
tokio::spawn(async move {
let colors = screenshot_manager::get_display_colors(
display_id,
&sample_points,
bound_scale_factor,
);
if let Err(err) = colors {
warn!("Failed to get colors: {}", err);
return;
}
let mut interval = tokio::time::interval(Duration::from_millis(33));
let init_version = internal_tasks_version.read().await.clone();
loop {
interval.tick().await;
tokio::time::sleep(Duration::from_millis(1)).await;
while screenshot_rx.changed().await.is_ok() {
let screenshot = screenshot_rx.borrow().clone();
let colors = screenshot.get_colors_by_sample_points(&sample_points).await;
let colors_copy = colors.clone();
let mappers = mappers.clone();
match Self::send_colors_by_display(colors, mappers).await {
Ok(_) => {
// log::info!("sent colors: #{: >15}", display_id);
}
Err(err) => {
warn!("Failed to send colors: #{: >15}\t{}", display_id, err);
}
}
// match display_colors_tx.send((
// display_id,
// colors_copy
// .into_iter()
// .map(|color| color.get_rgb())
// .flatten()
// .collect::<Vec<_>>(),
// )) {
// Ok(_) => {
// // log::info!("sent colors: {:?}", color_len);
// }
// Err(err) => {
// warn!("Failed to send display_colors: {}", err);
// }
// };
// Check if the inner task version changed
let version = internal_tasks_version.read().await.clone();
if version != init_version {
log::info!(
"inner task version changed, stop. {} != {}",
@ -88,51 +115,6 @@ impl LedColorsPublisher {
break;
}
let colors = screenshot_manager::get_display_colors(
display_id,
&sample_points,
bound_scale_factor,
);
if let Err(err) = colors {
warn!("Failed to get colors: {}", err);
sleep(Duration::from_millis(100)).await;
continue;
}
let colors: Vec<crate::led_color::LedColor> = colors.unwrap();
let colors_copy = colors.clone();
let mappers = mappers.clone();
tokio::spawn(async move {
match Self::send_colors_by_display(colors, mappers).await {
Ok(_) => {
// log::info!("sent colors: #{: >15}", display_id);
}
Err(err) => {
warn!("Failed to send colors: #{: >15}\t{}", display_id, err);
}
}
});
match display_colors_tx.send((
display_id,
colors_copy
.into_iter()
.map(|color| color.get_rgb())
.flatten()
.collect::<Vec<_>>(),
)) {
Ok(_) => {
// log::info!("sent colors: {:?}", color_len);
}
Err(err) => {
warn!("Failed to send display_colors: {}", err);
}
};
}
});
}
@ -212,58 +194,61 @@ impl LedColorsPublisher {
});
}
pub fn start(&self) {
pub async fn start(&self) {
log::info!("start colors worker");
let config_manager = ConfigManager::global().await;
let mut config_receiver = config_manager.clone_config_update_receiver();
let configs = config_receiver.borrow().clone();
self.handle_config_change(configs).await;
log::info!("waiting for config update...");
while config_receiver.changed().await.is_ok() {
log::info!("config updated, restart inner tasks...");
let configs = config_receiver.borrow().clone();
self.handle_config_change(configs).await;
}
}
async fn handle_config_change(&self, configs: LedStripConfigGroup) {
let inner_tasks_version = self.inner_tasks_version.clone();
let configs = Self::get_colors_configs(&configs).await;
tokio::spawn(async move {
let publisher = Self::global().await;
if let Err(err) = configs {
warn!("Failed to get configs: {}", err);
sleep(Duration::from_millis(100)).await;
return;
}
let config_manager = ConfigManager::global().await;
let mut config_receiver = config_manager.clone_config_update_receiver();
let configs = configs.unwrap();
log::info!("waiting for config update...");
let mut inner_tasks_version = inner_tasks_version.write().await;
*inner_tasks_version = inner_tasks_version.overflowing_add(1).0;
drop(inner_tasks_version);
while config_receiver.changed().await.is_ok() {
log::info!("config updated, restart inner tasks...");
let configs = config_receiver.borrow().clone();
let configs = Self::get_colors_configs(&configs).await;
let (display_colors_tx, display_colors_rx) = broadcast::channel::<(u32, Vec<u8>)>(8);
if let Err(err) = configs {
warn!("Failed to get configs: {}", err);
sleep(Duration::from_millis(100)).await;
continue;
}
for sample_point_group in configs.sample_point_groups.clone() {
let display_id = sample_point_group.display_id;
let sample_points = sample_point_group.points;
let bound_scale_factor = sample_point_group.bound_scale_factor;
self.start_one_display_colors_fetcher(
display_id,
sample_points,
bound_scale_factor,
sample_point_group.mappers,
display_colors_tx.clone(),
)
.await;
}
let configs = configs.unwrap();
let mut inner_tasks_version = inner_tasks_version.write().await;
*inner_tasks_version = inner_tasks_version.overflowing_add(1).0;
drop(inner_tasks_version);
let (display_colors_tx, display_colors_rx) =
broadcast::channel::<(u32, Vec<u8>)>(8);
for sample_point_group in configs.sample_point_groups.clone() {
let display_id = sample_point_group.display_id;
let sample_points = sample_point_group.points;
let bound_scale_factor = sample_point_group.bound_scale_factor;
publisher.start_one_display_colors_fetcher(
display_id,
sample_points,
bound_scale_factor,
sample_point_group.mappers,
display_colors_tx.clone(),
);
}
let display_ids = configs.sample_point_groups;
publisher.start_all_colors_worker(
display_ids.iter().map(|c| c.display_id).collect(),
configs.mappers,
display_colors_rx,
);
}
});
let display_ids = configs.sample_point_groups;
self.start_all_colors_worker(
display_ids.iter().map(|c| c.display_id).collect(),
configs.mappers,
display_colors_rx,
);
}
pub async fn send_colors(offset: u16, mut payload: Vec<u8>) -> anyhow::Result<()> {
@ -402,6 +387,7 @@ impl LedColorsPublisher {
let points: Vec<_> = led_strip_configs
.clone()
.map(|(_, config)| screenshot.get_sample_points(&config))
.flatten()
.collect();
if points.len() == 0 {
@ -451,7 +437,7 @@ pub struct AllColorConfig {
#[derive(Debug, Clone)]
pub struct DisplaySamplePointGroup {
pub display_id: u32,
pub points: Vec<Vec<LedSamplePoints>>,
pub points: Vec<LedSamplePoints>,
pub bound_scale_factor: f32,
pub mappers: Vec<config::SamplePointMapper>,
}

View File

@ -14,7 +14,7 @@ impl DisplayHandler {
pub async fn fetch_state(&self) {
let mut controller = self.controller.write().await;
let mut temp_state = DisplayState::default();
let mut temp_state = self.state.read().await.clone();
match controller.handle.get_vcp_feature(0x10) {
Ok(value) => {

View File

@ -34,3 +34,15 @@ impl DisplayState {
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct DisplayStateWrapper {
pub version: u8,
pub states: Vec<DisplayState>,
}
impl DisplayStateWrapper {
pub fn new(states: Vec<DisplayState>) -> Self {
Self { version: 1, states }
}
}

View File

@ -1,20 +1,27 @@
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use std::{env::current_dir, sync::Arc, time::Duration};
use ddc_hi::Display;
use paris::{error, info, warn};
use tokio::{sync::{watch, OnceCell, RwLock}, task::yield_now};
use tauri::api::path::config_dir;
use tokio::{
sync::{broadcast, watch, OnceCell, RwLock},
task::yield_now,
};
use crate::rpc::{BoardMessageChannels, DisplaySetting};
use crate::{
display::DisplayStateWrapper,
rpc::{BoardMessageChannels, DisplaySetting},
};
use super::{display_handler::DisplayHandler, display_state::DisplayState};
const CONFIG_FILE_NAME: &str = "cc.ivanli.ambient_light/displays.toml";
pub struct DisplayManager {
displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>,
setting_request_handler: Option<tokio::task::JoinHandle<()>>,
displays_changed_sender: Arc<watch::Sender<Vec<DisplayState>>>,
auto_save_state_handler: Option<tokio::task::JoinHandle<()>>,
}
impl DisplayManager {
@ -32,12 +39,45 @@ impl DisplayManager {
displays: Arc::new(RwLock::new(Vec::new())),
setting_request_handler: None,
displays_changed_sender,
auto_save_state_handler: None,
};
instance.fetch_displays().await;
instance.restore_states().await;
instance.fetch_state_of_displays().await;
instance.subscribe_setting_request();
instance.auto_save_state_of_displays();
instance
}
fn auto_save_state_of_displays(&mut self) {
let displays = self.displays.clone();
let handler = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
Self::save_states(displays.clone()).await;
Self::send_displays_changed(displays.clone()).await;
}
});
self.auto_save_state_handler = Some(handler);
}
async fn send_displays_changed(displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>) {
let mut states = Vec::new();
for display in displays.read().await.iter() {
let state = display.read().await.state.read().await.clone();
states.push(state);
}
let channel = BoardMessageChannels::global().await;
let tx = channel.displays_changed_sender.clone();
if let Err(err) = tx.send(states) {
error!("Failed to send displays changed: {}", err);
}
}
async fn fetch_displays(&self) {
let mut displays = self.displays.write().await;
displays.clear();
@ -52,12 +92,19 @@ impl DisplayManager {
controller: controller.clone(),
};
handler.fetch_state().await;
displays.push(Arc::new(RwLock::new(handler)));
}
}
async fn fetch_state_of_displays(&self) {
let displays = self.displays.read().await;
for display in displays.iter() {
let display = display.read().await;
display.fetch_state().await;
}
}
pub async fn get_displays(&self) -> Vec<DisplayState> {
let displays = self.displays.read().await;
let mut states = Vec::new();
@ -75,7 +122,22 @@ impl DisplayManager {
let channels = BoardMessageChannels::global().await;
let mut request_rx = channels.display_setting_request_sender.subscribe();
while let Ok(message) = request_rx.recv().await {
loop {
if let Err(err) = request_rx.recv().await {
match err {
broadcast::error::RecvError::Closed => {
info!("display setting request channel closed");
break;
}
broadcast::error::RecvError::Lagged(_) => {
warn!("display setting request channel lagged");
continue;
}
}
}
let message = request_rx.recv().await.unwrap();
let displays = displays.write().await;
let display = displays.get(message.display_index);
@ -84,7 +146,6 @@ impl DisplayManager {
continue;
}
let display = display.unwrap().write().await;
let result = match message.setting {
DisplaySetting::Brightness(value) => display.set_brightness(value as u16).await,
@ -115,6 +176,93 @@ impl DisplayManager {
self.setting_request_handler = Some(handler);
}
async fn restore_states(&self) {
let path = config_dir()
.unwrap_or(current_dir().unwrap())
.join(CONFIG_FILE_NAME);
if !path.exists() {
log::info!("config file not found: {}. skip read.", path.display());
return;
}
let text = std::fs::read_to_string(path);
if let Err(err) = text {
log::error!("failed to read config file: {}", err);
return;
}
let text = text.unwrap();
let wrapper = toml::from_str::<DisplayStateWrapper>(&text);
if let Err(err) = wrapper {
log::error!("failed to parse display states file: {}", err);
return;
}
let states = wrapper.unwrap().states;
let displays = self.displays.read().await;
for (index, display) in displays.iter().enumerate() {
let display = display.read().await;
let mut state = display.state.write().await;
let saved = states.get(index);
if let Some(saved) = saved {
state.brightness = saved.brightness;
state.contrast = saved.contrast;
state.mode = saved.mode;
log::info!("restore display config. display#{}: {:?}", index, state);
}
}
log::info!(
"restore display config. store displays: {}, online displays: {}",
states.len(),
displays.len()
);
}
async fn save_states(displays: Arc<RwLock<Vec<Arc<RwLock<DisplayHandler>>>>>) {
let path = config_dir()
.unwrap_or(current_dir().unwrap())
.join(CONFIG_FILE_NAME);
let displays = displays.read().await;
let mut states = Vec::new();
for display in displays.iter() {
let state = display.read().await.state.read().await.clone();
states.push(state);
}
let wrapper = DisplayStateWrapper::new(states);
let text = toml::to_string(&wrapper);
if let Err(err) = text {
log::error!("failed to serialize display states: {}", err);
log::error!("display states: {:?}", &wrapper);
return;
}
let text = text.unwrap();
if path.exists() {
if let Err(err) = std::fs::remove_file(&path) {
log::error!("failed to remove old config file: {}", err);
return;
}
}
if let Err(err) = std::fs::write(&path, text) {
log::error!("failed to write config file: {}", err);
return;
}
log::debug!(
"save display config. store displays: {}, online displays: {}",
wrapper.states.len(),
displays.len()
);
}
pub fn subscribe_displays_changed(&self) -> watch::Receiver<Vec<DisplayState>> {
self.displays_changed_sender.subscribe()
}
@ -122,8 +270,13 @@ impl DisplayManager {
impl Drop for DisplayManager {
fn drop(&mut self) {
log::info!("dropping display manager=============");
if let Some(handler) = self.setting_request_handler.take() {
handler.abort();
}
if let Some(handler) = self.auto_save_state_handler.take() {
handler.abort();
}
}
}

View File

@ -13,7 +13,7 @@ use ambient_light::{Border, ColorCalibration, LedStripConfig, LedStripConfigGrou
use display::{DisplayManager, DisplayState};
use display_info::DisplayInfo;
use paris::{error, info, warn};
use rpc::{BoardInfo, MqttRpc, UdpRpc};
use rpc::{BoardInfo, UdpRpc};
use screenshot::Screenshot;
use screenshot_manager::ScreenshotManager;
use serde::{Deserialize, Serialize};
@ -88,7 +88,7 @@ async fn get_led_strips_sample_points(
let screenshot_manager = ScreenshotManager::global().await;
let channels = screenshot_manager.channels.read().await;
if let Some(rx) = channels.get(&config.display_id) {
let rx = rx.clone();
let rx = rx.read().await;
let screenshot = rx.borrow().clone();
let sample_points = screenshot.get_sample_points(&config);
Ok(sample_points)
@ -105,7 +105,7 @@ async fn get_one_edge_colors(
let screenshot_manager = ScreenshotManager::global().await;
let channels = screenshot_manager.channels.read().await;
if let Some(rx) = channels.get(&display_id) {
let rx = rx.clone();
let rx = rx.read().await;
let screenshot = rx.borrow().clone();
let bytes = screenshot.bytes.read().await.to_owned();
let colors =
@ -217,13 +217,17 @@ async fn get_displays() -> Vec<DisplayState> {
async fn main() {
env_logger::init();
let screenshot_manager = ScreenshotManager::global().await;
screenshot_manager.start().unwrap();
tokio::spawn(async move {
let screenshot_manager = ScreenshotManager::global().await;
screenshot_manager.start().await.unwrap_or_else(|e| {
error!("can not start screenshot manager: {}", e);
})
});
let led_color_publisher = ambient_light::LedColorsPublisher::global().await;
led_color_publisher.start();
let _mqtt = MqttRpc::global().await;
tokio::spawn(async move {
let led_color_publisher = ambient_light::LedColorsPublisher::global().await;
led_color_publisher.start().await;
});
let _volume = VolumeManager::global().await;
@ -284,77 +288,87 @@ async fn main() {
let bytes = tokio::task::block_in_place(move || {
tauri::async_runtime::block_on(async move {
let screenshot_manager = ScreenshotManager::global().await;
let channels = screenshot_manager.channels.read().await;
if let Some(rx) = channels.get(&display_id) {
let rx = rx.clone();
let screenshot = rx.borrow().clone();
let bytes = screenshot.bytes.read().await;
let rx: Result<tokio::sync::watch::Receiver<Screenshot>, anyhow::Error> =
screenshot_manager.subscribe_by_display_id(display_id).await;
let (scale_factor_x, scale_factor_y, width, height) = if url.query.is_some()
&& url.query.as_ref().unwrap().contains_key("height")
&& url.query.as_ref().unwrap().contains_key("width")
{
let width = url.query.as_ref().unwrap()["width"]
.parse::<u32>()
.map_err(|err| {
warn!("width parse error: {}", err);
err
})?;
let height = url.query.as_ref().unwrap()["height"]
.parse::<u32>()
.map_err(|err| {
warn!("height parse error: {}", err);
err
})?;
(
screenshot.width as f32 / width as f32,
screenshot.height as f32 / height as f32,
width,
height,
)
} else {
log::debug!("scale by scale_factor");
let scale_factor = screenshot.scale_factor;
(
scale_factor,
scale_factor,
(screenshot.width as f32 / scale_factor) as u32,
(screenshot.height as f32 / scale_factor) as u32,
)
};
log::debug!(
"scale by query. width: {}, height: {}, scale_factor: {}, len: {}",
if let Err(err) = rx {
anyhow::bail!("Display#{}: not found. {}", display_id, err);
}
let mut rx = rx.unwrap();
if rx.changed().await.is_err() {
anyhow::bail!("Display#{}: no more screenshot.", display_id);
}
let screenshot = rx.borrow().clone();
let bytes = screenshot.bytes.read().await;
if bytes.len() == 0 {
anyhow::bail!("Display#{}: no screenshot.", display_id);
}
log::debug!("Display#{}: screenshot size: {}", display_id, bytes.len());
let (scale_factor_x, scale_factor_y, width, height) = if url.query.is_some()
&& url.query.as_ref().unwrap().contains_key("height")
&& url.query.as_ref().unwrap().contains_key("width")
{
let width = url.query.as_ref().unwrap()["width"]
.parse::<u32>()
.map_err(|err| {
warn!("width parse error: {}", err);
err
})?;
let height = url.query.as_ref().unwrap()["height"]
.parse::<u32>()
.map_err(|err| {
warn!("height parse error: {}", err);
err
})?;
(
screenshot.width as f32 / width as f32,
screenshot.height as f32 / height as f32,
width,
height,
screenshot.width as f32 / width as f32,
width * height * 4,
);
let bytes_per_row = screenshot.bytes_per_row as f32;
let mut rgba_buffer = vec![0u8; (width * height * 4) as usize];
for y in 0..height {
for x in 0..width {
let offset = ((y as f32) * scale_factor_y).floor() as usize
* bytes_per_row as usize
+ ((x as f32) * scale_factor_x).floor() as usize * 4;
let b = bytes[offset];
let g = bytes[offset + 1];
let r = bytes[offset + 2];
let a = bytes[offset + 3];
let offset_2 = (y * width + x) as usize * 4;
rgba_buffer[offset_2] = r;
rgba_buffer[offset_2 + 1] = g;
rgba_buffer[offset_2 + 2] = b;
rgba_buffer[offset_2 + 3] = a;
}
}
Ok(rgba_buffer.clone())
)
} else {
anyhow::bail!("Display#{}: not found", display_id);
log::debug!("scale by scale_factor");
let scale_factor = screenshot.scale_factor;
(
scale_factor,
scale_factor,
(screenshot.width as f32 / scale_factor) as u32,
(screenshot.height as f32 / scale_factor) as u32,
)
};
log::debug!(
"scale by query. width: {}, height: {}, scale_factor: {}, len: {}",
width,
height,
screenshot.width as f32 / width as f32,
width * height * 4,
);
let bytes_per_row = screenshot.bytes_per_row as f32;
let mut rgba_buffer = vec![0u8; (width * height * 4) as usize];
for y in 0..height {
for x in 0..width {
let offset = ((y as f32) * scale_factor_y).floor() as usize
* bytes_per_row as usize
+ ((x as f32) * scale_factor_x).floor() as usize * 4;
let b = bytes[offset];
let g = bytes[offset + 1];
let r = bytes[offset + 2];
let a = bytes[offset + 3];
let offset_2 = (y * width + x) as usize * 4;
rgba_buffer[offset_2] = r;
rgba_buffer[offset_2 + 1] = g;
rgba_buffer[offset_2 + 2] = b;
rgba_buffer[offset_2 + 3] = a;
}
}
Ok(rgba_buffer.clone())
})
});
@ -375,8 +389,7 @@ async fn main() {
let app_handle = app.handle().clone();
tokio::spawn(async move {
let config_manager = ambient_light::ConfigManager::global().await;
let config_update_receiver = config_manager.clone_config_update_receiver();
let mut config_update_receiver = config_update_receiver;
let mut config_update_receiver = config_manager.clone_config_update_receiver();
loop {
if let Err(err) = config_update_receiver.changed().await {
error!("config update receiver changed error: {}", err);
@ -457,7 +470,7 @@ async fn main() {
let app_handle = app.handle().clone();
tokio::spawn(async move {
let display_manager = DisplayManager::global().await;
let mut rx =display_manager.subscribe_displays_changed();
let mut rx = display_manager.subscribe_displays_changed();
while rx.changed().await.is_ok() {
let displays = rx.borrow().clone();

View File

@ -3,15 +3,22 @@ use std::{sync::Arc, time::Duration};
use paris::{error, info, warn};
use tokio::{io, net::UdpSocket, sync::RwLock, task::yield_now, time::timeout};
use crate::rpc::DisplaySettingRequest;
use crate::{
ambient_light::{ConfigManager, LedStripConfig},
rpc::DisplaySettingRequest,
volume::{self, VolumeManager},
};
use super::{BoardConnectStatus, BoardInfo};
use super::{BoardConnectStatus, BoardInfo, BoardMessageChannels};
#[derive(Debug)]
pub struct Board {
pub info: Arc<RwLock<BoardInfo>>,
socket: Option<Arc<UdpSocket>>,
listen_handler: Option<tokio::task::JoinHandle<()>>,
volume_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
state_of_displays_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
led_strip_config_changed_subscriber_handler: Option<tokio::task::JoinHandle<()>>,
}
impl Board {
@ -20,25 +27,23 @@ impl Board {
info: Arc::new(RwLock::new(info)),
socket: None,
listen_handler: None,
volume_changed_subscriber_handler: None,
state_of_displays_changed_subscriber_handler: None,
led_strip_config_changed_subscriber_handler: None,
}
}
pub async fn init_socket(&mut self) -> anyhow::Result<()> {
let info = self.info.read().await;
let info = self.info.clone();
let info = info.read().await;
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.connect((info.address, info.port)).await?;
let socket = Arc::new(socket);
self.socket = Some(socket.clone());
let info = self.info.clone();
let handler = tokio::spawn(async move {
let mut buf = [0u8; 128];
if let Err(err) = socket.readable().await {
error!("socket read error: {:?}", err);
return;
}
let board_message_channels = crate::rpc::channels::BoardMessageChannels::global().await;
@ -49,7 +54,7 @@ impl Board {
board_message_channels.volume_setting_request_sender.clone();
loop {
match socket.try_recv(&mut buf) {
match socket.recv(&mut buf).await {
Ok(len) => {
log::info!("recv: {:?}", &buf[..len]);
if buf[0] == 3 {
@ -64,6 +69,9 @@ impl Board {
}
} else if buf[0] == 4 {
let result = volume_setting_request_sender.send(buf[1] as f32 / 100.0);
if let Err(err) = result {
error!("send volume setting request to channel failed: {:?}", err);
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
@ -79,9 +87,156 @@ impl Board {
});
self.listen_handler = Some(handler);
self.subscribe_volume_changed().await;
self.subscribe_state_of_displays_changed().await;
self.subscribe_led_strip_config_changed().await;
Ok(())
}
async fn subscribe_volume_changed(&mut self) {
let channel = BoardMessageChannels::global().await;
let mut volume_changed_rx = channel.volume_changed_sender.subscribe();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
loop {
let volume: Result<f32, tokio::sync::broadcast::error::RecvError> =
volume_changed_rx.recv().await;
if let Err(err) = volume {
match err {
tokio::sync::broadcast::error::RecvError::Closed => {
log::error!("volume changed channel closed");
break;
}
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
log::info!("volume changed channel lagged");
continue;
}
}
}
let volume = volume.unwrap();
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send volume changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 2];
buf[0] = 4;
buf[1] = (volume * 100.0) as u8;
if let Err(err) = socket.send(&buf).await {
log::warn!("send volume changed failed: {:?}", err);
}
}
});
let volume_manager = VolumeManager::global().await;
let volume = volume_manager.get_volume().await;
if let Some(socket) = self.socket.as_ref() {
let buf = [4, (volume * 100.0) as u8];
if let Err(err) = socket.send(&buf).await {
log::warn!("send volume failed: {:?}", err);
}
} else {
log::warn!("socket is none, skip send volume");
}
self.volume_changed_subscriber_handler = Some(handler);
}
async fn subscribe_state_of_displays_changed(&mut self) {
let channel: &BoardMessageChannels = BoardMessageChannels::global().await;
let mut state_of_displays_changed_rx = channel.displays_changed_sender.subscribe();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
loop {
let states: Result<
Vec<crate::display::DisplayState>,
tokio::sync::broadcast::error::RecvError,
> = state_of_displays_changed_rx.recv().await;
if let Err(err) = states {
match err {
tokio::sync::broadcast::error::RecvError::Closed => {
log::error!("state of displays changed channel closed");
break;
}
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
log::info!("state of displays changed channel lagged");
continue;
}
}
}
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send state of displays changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 3];
let states = states.unwrap();
for (index, state) in states.iter().enumerate() {
buf[0] = 3;
buf[1] = index as u8;
buf[2] = state.brightness as u8;
log::info!("send state of displays changed: {:?}", &buf[..]);
if let Err(err) = socket.send(&buf).await {
log::warn!("send state of displays changed failed: {:?}", err);
}
}
}
});
self.state_of_displays_changed_subscriber_handler = Some(handler);
}
async fn subscribe_led_strip_config_changed(&mut self) {
let config_manager = ConfigManager::global().await;
let mut led_strip_config_changed_rx = config_manager.clone_config_update_receiver();
let info = self.info.clone();
let socket = self.socket.clone();
let handler = tokio::spawn(async move {
while led_strip_config_changed_rx.changed().await.is_ok() {
let config = led_strip_config_changed_rx.borrow().clone();
let info = info.read().await;
if socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
log::info!("board is not connected, skip send led strip config changed");
continue;
}
let socket = socket.as_ref().unwrap();
let mut buf = [0u8; 4];
buf[0] = 5;
buf[1..].copy_from_slice(&config.color_calibration.to_bytes());
log::info!("send led strip config changed: {:?}", &buf[..]);
if let Err(err) = socket.send(&buf).await {
log::warn!("send led strip config changed failed: {:?}", err);
}
}
});
self.led_strip_config_changed_subscriber_handler = Some(handler);
}
pub async fn send_colors(&self, buf: &[u8]) {
let info = self.info.read().await;
if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected {
@ -152,12 +307,22 @@ impl Board {
impl Drop for Board {
fn drop(&mut self) {
info!("board drop");
if let Some(handler) = self.listen_handler.take() {
info!("aborting listen handler");
tokio::task::block_in_place(move || {
handler.abort();
});
info!("listen handler aborted");
handler.abort();
}
if let Some(handler) = self.volume_changed_subscriber_handler.take() {
handler.abort();
}
if let Some(handler) = self.state_of_displays_changed_subscriber_handler.take() {
handler.abort();
}
if let Some(handler) = self.led_strip_config_changed_subscriber_handler.take() {
handler.abort();
}
}
}

View File

@ -2,11 +2,15 @@ use std::sync::Arc;
use tokio::sync::{broadcast, OnceCell};
use crate::display::DisplayState;
use super::DisplaySettingRequest;
pub struct BoardMessageChannels {
pub display_setting_request_sender: Arc<broadcast::Sender<DisplaySettingRequest>>,
pub volume_setting_request_sender: Arc<broadcast::Sender<f32>>,
pub volume_changed_sender: Arc<broadcast::Sender<f32>>,
pub displays_changed_sender: Arc<broadcast::Sender<Vec<DisplayState>>>,
}
impl BoardMessageChannels {
@ -23,9 +27,17 @@ impl BoardMessageChannels {
let (volume_setting_request_sender, _) = broadcast::channel(16);
let volume_setting_request_sender = Arc::new(volume_setting_request_sender);
let (volume_changed_sender, _) = broadcast::channel(2);
let volume_changed_sender = Arc::new(volume_changed_sender);
let (displays_changed_sender, _) = broadcast::channel(2);
let displays_changed_sender = Arc::new(displays_changed_sender);
Self {
display_setting_request_sender,
volume_setting_request_sender,
volume_changed_sender,
displays_changed_sender,
}
}
}

View File

@ -1,12 +1,10 @@
mod board_info;
mod mqtt;
mod udp;
mod board;
mod display_setting_request;
mod channels;
pub use board_info::*;
pub use mqtt::*;
pub use udp::*;
pub use board::*;
pub use display_setting_request::*;

View File

@ -1,163 +0,0 @@
use paho_mqtt as mqtt;
use paris::{info, warn};
use serde_json::json;
use std::time::Duration;
use time::{format_description, OffsetDateTime};
use tokio::{sync::OnceCell, task};
use crate::ambient_light::{ColorCalibration, ConfigManager};
const DISPLAY_TOPIC: &'static str = "display-ambient-light/display";
const DESKTOP_TOPIC: &'static str = "display-ambient-light/desktop";
const COLOR_CALIBRATION: &'static str = "display-ambient-light/desktop/color-calibration";
pub struct MqttRpc {
client: mqtt::AsyncClient,
// change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>,
// message_tx: broadcast::Sender<models::CmdMqMessage>,
}
impl MqttRpc {
pub async fn global() -> &'static Self {
static MQTT_RPC: OnceCell<MqttRpc> = OnceCell::const_new();
MQTT_RPC
.get_or_init(|| async {
let mqtt_rpc = MqttRpc::new().await.unwrap();
mqtt_rpc.initialize().await.unwrap();
mqtt_rpc
})
.await
}
pub async fn new() -> anyhow::Result<Self> {
let client = mqtt::AsyncClient::new("tcp://192.168.31.11:1883")
.map_err(|err| anyhow::anyhow!("can not create MQTT client. {:?}", err))?;
client.set_connected_callback(|client| {
info!("MQTT server connected.");
client.subscribe("display-ambient-light/board/#", mqtt::QOS_1);
client.subscribe(format!("{}/#", DISPLAY_TOPIC), mqtt::QOS_1);
});
client.set_connection_lost_callback(|_| {
info!("MQTT server connection lost.");
});
client.set_disconnected_callback(|_, a1, a2| {
info!("MQTT server disconnected. {:?} {:?}", a1, a2);
});
let mut last_will_payload = serde_json::Map::new();
last_will_payload.insert("message".to_string(), json!("offline"));
last_will_payload.insert(
"time".to_string(),
serde_json::Value::String(
OffsetDateTime::now_utc()
.format(&time::format_description::well_known::iso8601::Iso8601::DEFAULT)
.unwrap()
.to_string(),
),
);
let last_will = mqtt::Message::new(
format!("{}/status", DESKTOP_TOPIC),
serde_json::to_string(&last_will_payload)
.unwrap()
.as_bytes(),
mqtt::QOS_1,
);
let connect_options = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(5))
.will_message(last_will)
.automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5))
.finalize();
let token = client.connect(connect_options);
token.await.map_err(|err| {
anyhow::anyhow!(
"can not connect MQTT server. wait for connect token failed. {:?}",
err
)
})?;
// let (change_display_brightness_tx, _) =
// broadcast::channel::<display::DisplayBrightness>(16);
// let (message_tx, _) = broadcast::channel::<models::CmdMqMessage>(32);
Ok(Self { client })
}
pub async fn initialize(&self) -> anyhow::Result<()> {
self.broadcast_desktop_online();
Self::publish_color_calibration_worker();
anyhow::Ok(())
}
fn publish_color_calibration_worker() {
tokio::spawn(async move {
let mqtt = Self::global().await;
let config_manager = ConfigManager::global().await;
let mut config_receiver = config_manager.clone_config_update_receiver();
let config = config_manager.configs().await;
if let Err(err) = mqtt
.publish_color_calibration(config.color_calibration)
.await
{
warn!("can not publish color calibration. {}", err);
}
while config_receiver.changed().await.is_ok() {
let config = config_receiver.borrow().clone();
if let Err(err) = mqtt
.publish_color_calibration(config.color_calibration)
.await
{
warn!("can not publish color calibration. {}", err);
}
}
});
}
fn broadcast_desktop_online(&self) {
let client = self.client.to_owned();
task::spawn(async move {
loop {
match OffsetDateTime::now_utc()
.format(&format_description::well_known::Iso8601::DEFAULT)
{
Ok(now_str) => {
let msg = mqtt::Message::new(
"display-ambient-light/desktop/online",
now_str.as_bytes(),
mqtt::QOS_0,
);
match client.publish(msg).await {
Ok(_) => {}
Err(error) => {
warn!("can not publish last online time. {}", error)
}
}
}
Err(error) => {
warn!("can not get time for now. {}", error);
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
});
}
pub async fn publish_color_calibration(&self, payload: ColorCalibration) -> anyhow::Result<()> {
self.client
.publish(mqtt::Message::new(
COLOR_CALIBRATION,
payload.to_bytes(),
mqtt::QOS_1,
))
.await
.map_err(|error| anyhow::anyhow!("mqtt publish color calibration failed. {}", error))
}
}

View File

@ -54,7 +54,7 @@ impl UdpRpc {
}
}
});
let shared_self_for_check = shared_self.clone();
tokio::spawn(async move {
shared_self_for_check.check_boards().await;
@ -99,7 +99,9 @@ impl UdpRpc {
}
if boards.insert(board_info.fullname.clone(), board).is_some() {
info!("added board {:?}", board_info);
info!("replace board {:?}", board_info);
} else {
info!("add board {:?}", board_info);
}
let tx_boards = boards

View File

@ -1,5 +1,6 @@
use std::iter;
use std::fmt::Formatter;
use std::{iter, fmt::Debug};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
@ -7,17 +8,30 @@ use tauri::async_runtime::RwLock;
use crate::{ambient_light::LedStripConfig, led_color::LedColor};
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct Screenshot {
pub display_id: u32,
pub height: u32,
pub width: u32,
pub bytes_per_row: usize,
pub bytes: Arc<RwLock<Vec<u8>>>,
pub bytes: Arc<RwLock<Arc<Vec<u8>>>>,
pub scale_factor: f32,
pub bound_scale_factor: f32,
}
impl Debug for Screenshot {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Screenshot")
.field("display_id", &self.display_id)
.field("height", &self.height)
.field("width", &self.width)
.field("bytes_per_row", &self.bytes_per_row)
.field("scale_factor", &self.scale_factor)
.field("bound_scale_factor", &self.bound_scale_factor)
.finish()
}
}
static SINGLE_AXIS_POINTS: usize = 5;
impl Screenshot {
@ -26,7 +40,7 @@ impl Screenshot {
height: u32,
width: u32,
bytes_per_row: usize,
bytes: Vec<u8>,
bytes: Arc<Vec<u8>>,
scale_factor: f32,
bound_scale_factor: f32,
) -> Self {

View File

@ -1,51 +1,20 @@
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use core_graphics::display::{
kCGNullWindowID, kCGWindowImageDefault, kCGWindowListOptionOnScreenOnly, CGDisplay,
};
use core_graphics::geometry::{CGPoint, CGRect, CGSize};
use paris::warn;
use paris::{info, warn};
use rust_swift_screencapture::display::CGDisplayId;
use tauri::async_runtime::RwLock;
use tokio::sync::{broadcast, watch, OnceCell};
use tokio::time::{self, Duration};
use tokio::sync::{broadcast, watch, Mutex, OnceCell};
use tokio::task::yield_now;
use tokio::time::sleep;
use crate::screenshot::LedSamplePoints;
use crate::{ambient_light::SamplePointMapper, led_color::LedColor, screenshot::Screenshot};
pub fn take_screenshot(display_id: u32, scale_factor: f32) -> anyhow::Result<Screenshot> {
log::debug!("take_screenshot");
let cg_display = CGDisplay::new(display_id);
let cg_image = CGDisplay::screenshot(
cg_display.bounds(),
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID,
kCGWindowImageDefault,
)
.ok_or_else(|| anyhow::anyhow!("Display#{}: take screenshot failed", display_id))?;
let buffer = cg_image.data();
let bytes_per_row = cg_image.bytes_per_row();
let height = cg_image.height();
let width = cg_image.width();
let bytes = buffer.bytes().to_owned();
let cg_display = CGDisplay::new(display_id);
let bound_scale_factor = (cg_display.bounds().size.width / width as f64) as f32;
Ok(Screenshot::new(
display_id,
height as u32,
width as u32,
bytes_per_row,
bytes,
scale_factor,
bound_scale_factor,
))
}
pub fn get_display_colors(
display_id: u32,
sample_points: &Vec<Vec<LedSamplePoints>>,
@ -114,7 +83,7 @@ pub fn get_display_colors(
}
pub struct ScreenshotManager {
pub channels: Arc<RwLock<HashMap<u32, watch::Receiver<Screenshot>>>>,
pub channels: Arc<RwLock<HashMap<u32, Arc<RwLock<watch::Sender<Screenshot>>>>>>,
merged_screenshot_tx: Arc<RwLock<broadcast::Sender<Screenshot>>>,
}
@ -134,74 +103,77 @@ impl ScreenshotManager {
.await
}
pub fn start(&self) -> anyhow::Result<()> {
pub async fn start(&self) -> anyhow::Result<()> {
let displays = display_info::DisplayInfo::all()?;
for display in displays {
self.start_one(display.id, display.scale_factor)?;
}
Ok(())
}
fn start_one(&self, display_id: u32, scale_factor: f32) -> anyhow::Result<()> {
let channels = self.channels.to_owned();
let merged_screenshot_tx = self.merged_screenshot_tx.clone();
tokio::spawn(async move {
let screenshot = take_screenshot(display_id, scale_factor);
if screenshot.is_err() {
warn!("take_screenshot_loop: {}", screenshot.err().unwrap());
return;
}
let mut interval = time::interval(Duration::from_millis(1000));
let screenshot = screenshot.unwrap();
let (screenshot_tx, screenshot_rx) = watch::channel(screenshot);
{
let channels = channels.clone();
let mut channels = channels.write().await;
channels.insert(display_id, screenshot_rx.clone());
}
let merged_screenshot_tx = merged_screenshot_tx.read().await.clone();
loop {
Self::take_screenshot_loop(
display_id,
scale_factor,
&screenshot_tx,
&merged_screenshot_tx,
)
.await;
interval.tick().await;
tokio::time::sleep(Duration::from_millis(1)).await;
}
let futures = displays.iter().map(|display| async {
self.start_one(display.id, display.scale_factor)
.await
.unwrap_or_else(|err| {
warn!("start_one failed: display_id: {}, err: {}", display.id, err);
});
info!("start_one finished: display_id: {}", display.id);
});
futures::future::join_all(futures).await;
Ok(())
}
async fn take_screenshot_loop(
display_id: u32,
scale_factor: f32,
screenshot_tx: &watch::Sender<Screenshot>,
merged_screenshot_tx: &broadcast::Sender<Screenshot>,
) {
let screenshot = take_screenshot(display_id, scale_factor);
if let Ok(screenshot) = screenshot {
match merged_screenshot_tx.send(screenshot.clone()) {
Ok(_) => {
log::info!(
"take_screenshot_loop: merged_screenshot_tx.send success. display#{}",
display_id
);
async fn start_one(&self, display_id: u32, scale_factor: f32) -> anyhow::Result<()> {
let merged_screenshot_tx = self.merged_screenshot_tx.clone();
let (tx, _) = watch::channel(Screenshot::new(
display_id,
0,
0,
0,
Arc::new(vec![]),
scale_factor,
scale_factor,
));
let tx = Arc::new(RwLock::new(tx));
let mut channels = self.channels.write().await;
channels.insert(display_id, tx.clone());
drop(channels);
loop {
let display = rust_swift_screencapture::display::Display::new(display_id);
let mut frame_rx = display.subscribe_frame().await;
display.start_capture(30).await;
let tx_for_send = tx.read().await;
while frame_rx.changed().await.is_ok() {
let frame = frame_rx.borrow().clone();
let screenshot = Screenshot::new(
display_id,
frame.height as u32,
frame.width as u32,
frame.bytes_per_row as usize,
frame.bytes,
scale_factor,
scale_factor,
);
let merged_screenshot_tx = merged_screenshot_tx.write().await;
if let Err(err) = merged_screenshot_tx.send(screenshot.clone()) {
// log::warn!("merged_screenshot_tx.send failed: {}", err);
}
Err(_) => {
if let Err(err) = tx_for_send.send(screenshot.clone()) {
log::warn!("display {} screenshot_tx.send failed: {}", display_id, err);
} else {
log::debug!("screenshot: {:?}", screenshot);
}
yield_now().await;
}
screenshot_tx.send(screenshot).unwrap();
// log::info!("take_screenshot_loop: send success. display#{}", display_id)
} else {
warn!("take_screenshot_loop: {}", screenshot.err().unwrap());
sleep(Duration::from_secs(5)).await;
info!(
"display {} frame_rx.changed() failed, try to restart",
display_id
);
}
}
@ -257,4 +229,16 @@ impl ScreenshotManager {
pub async fn clone_merged_screenshot_rx(&self) -> broadcast::Receiver<Screenshot> {
self.merged_screenshot_tx.read().await.subscribe()
}
pub async fn subscribe_by_display_id(
&self,
display_id: CGDisplayId,
) -> anyhow::Result<watch::Receiver<Screenshot>> {
let channels = self.channels.read().await;
if let Some(tx) = channels.get(&display_id) {
Ok(tx.read().await.subscribe())
} else {
Err(anyhow::anyhow!("display_id: {} not found", display_id))
}
}
}

View File

@ -1,23 +1,22 @@
use std::{
mem,
sync::{Arc, RwLock},
};
use std::{mem, sync::Arc};
use coreaudio::{
audio_unit::macos_helpers::get_default_device_id,
sys::{
kAudioHardwareServiceDeviceProperty_VirtualMasterVolume, kAudioObjectPropertyScopeOutput,
AudioObjectHasProperty, AudioObjectPropertyAddress, AudioObjectSetPropertyData,
AudioObjectGetPropertyData, AudioObjectHasProperty, AudioObjectPropertyAddress,
AudioObjectSetPropertyData,
},
};
use paris::error;
use tokio::sync::OnceCell;
use tokio::sync::{OnceCell, RwLock};
use crate::rpc::BoardMessageChannels;
pub struct VolumeManager {
current_volume: Arc<RwLock<f32>>,
handler: Option<tokio::task::JoinHandle<()>>,
read_handler: Option<tokio::task::JoinHandle<()>>,
}
impl VolumeManager {
@ -33,9 +32,11 @@ impl VolumeManager {
let mut instance = Self {
current_volume: Arc::new(RwLock::new(0.0)),
handler: None,
read_handler: None,
};
instance.subscribe_volume_setting_request();
instance.auto_read_volume();
instance
}
@ -55,6 +56,36 @@ impl VolumeManager {
self.handler = Some(handler);
}
fn auto_read_volume(&mut self) {
let current_volume = self.current_volume.clone();
let handler = tokio::spawn(async move {
let channel = BoardMessageChannels::global().await;
let volume_changed_tx = channel.volume_changed_sender.clone();
loop {
match Self::read_volume() {
Ok(value) => {
let mut volume = current_volume.write().await;
if *volume != value {
if let Err(err) = volume_changed_tx.send(value) {
error!("failed to send volume changed event: {}", err);
}
}
*volume = value;
}
Err(err) => {
error!("failed to read volume: {}", err);
}
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});
self.read_handler = Some(handler);
}
fn set_volume(volume: f32) -> anyhow::Result<()> {
log::debug!("set volume: {}", volume);
@ -98,4 +129,75 @@ impl VolumeManager {
Ok(())
}
fn read_volume() -> anyhow::Result<f32> {
let device_id = get_default_device_id(false);
if device_id.is_none() {
anyhow::bail!("default audio output device is not found.");
}
let device_id = device_id.unwrap();
let address = AudioObjectPropertyAddress {
mSelector: kAudioHardwareServiceDeviceProperty_VirtualMasterVolume,
mScope: kAudioObjectPropertyScopeOutput,
mElement: 0,
};
log::debug!("device id: {}", device_id);
log::debug!("address: {:?}", address);
if 0 == unsafe { AudioObjectHasProperty(device_id, &address) } {
anyhow::bail!("Can not get audio property");
}
let mut size = mem::size_of::<f32>() as u32;
let mut volume = 0.0f32;
let result = unsafe {
AudioObjectGetPropertyData(
device_id,
&address,
0,
std::ptr::null(),
&mut size,
&mut volume as *mut f32 as *mut std::ffi::c_void,
)
};
if result != 0 {
anyhow::bail!("Can not get audio property. result: {}", result);
}
if size != mem::size_of::<f32>() as u32 {
anyhow::bail!("Can not get audio property. data size is not matched.");
}
log::debug!("current system volume of primary device: {}", volume);
Ok(volume)
}
pub async fn get_volume(&self) -> f32 {
self.current_volume.read().await.clone()
}
}
impl Drop for VolumeManager {
fn drop(&mut self) {
log::info!("drop volume manager");
if let Some(handler) = self.handler.take() {
tokio::task::block_in_place(move || {
handler.abort();
});
}
if let Some(handler) = self.read_handler.take() {
tokio::task::block_in_place(move || {
handler.abort();
});
}
}
}

View File

@ -28,7 +28,10 @@
"icons/icon.ico"
],
"identifier": "cc.ivanli.ambient-light.desktop",
"targets": "all"
"targets": "all",
"macOS": {
"minimumSystemVersion": "13"
}
},
"security": {
"csp": null