From 11045f27d8783e531c1be8496bd32cacc84ed39d Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sat, 29 Apr 2023 18:07:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=80=9A=E8=BF=87=E6=96=B0=E7=9A=84=20?= =?UTF-8?q?udp=20=E9=80=BB=E8=BE=91=E5=8F=91=E9=80=81=E7=81=AF=E5=B8=A6?= =?UTF-8?q?=E9=A2=9C=E8=89=B2=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/Cargo.lock | 1 + src-tauri/Cargo.toml | 1 + src-tauri/src/ambient_light/publisher.rs | 13 ++++++-- src-tauri/src/rpc/udp.rs | 39 +++++++++++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index b3f9883..4294b80 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -2913,6 +2913,7 @@ dependencies = [ "core-graphics", "display-info", "env_logger", + "futures", "hex", "itertools", "log", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index c4641eb..38390b5 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -34,6 +34,7 @@ itertools = "0.10.5" core-foundation = "0.9.3" tokio-stream = "0.1.14" mdns-sd = "0.7.2" +futures = "0.3.28" [features] # this feature is used for production builds or when `devPath` points to the filesystem diff --git a/src-tauri/src/ambient_light/publisher.rs b/src-tauri/src/ambient_light/publisher.rs index bfbcde8..e72b619 100644 --- a/src-tauri/src/ambient_light/publisher.rs +++ b/src-tauri/src/ambient_light/publisher.rs @@ -12,7 +12,7 @@ use crate::{ ambient_light::{config, ConfigManager}, led_color::LedColor, screenshot::LedSamplePoints, - screenshot_manager::{self, ScreenshotManager}, + screenshot_manager::{self, ScreenshotManager}, rpc::UdpRpc, }; use itertools::Itertools; @@ -292,7 +292,13 @@ impl LedColorsPublisher { .min() .unwrap(); - let socket = UdpSocket::bind("0.0.0.0:0").await?; + let udp_rpc = UdpRpc::global().await; + if let Err(err) = udp_rpc { + warn!("udp_rpc can not be initialized: {}", err); + } + let udp_rpc = udp_rpc.as_ref().unwrap(); + + // let socket = UdpSocket::bind("0.0.0.0:0").await?; for group in mappers.clone() { if (group.start.abs_diff(group.end)) > colors.len() { return Err(anyhow::anyhow!( @@ -327,7 +333,8 @@ impl LedColorsPublisher { tx_buffer.push((offset >> 8) as u8); tx_buffer.push((offset & 0xff) as u8); tx_buffer.append(&mut buffer); - socket.send_to(&tx_buffer, "192.168.31.206:23042").await?; + + udp_rpc.send_to_all(&tx_buffer).await?; } Ok(()) diff --git a/src-tauri/src/rpc/udp.rs b/src-tauri/src/rpc/udp.rs index 08af6ca..65174d1 100644 --- a/src-tauri/src/rpc/udp.rs +++ b/src-tauri/src/rpc/udp.rs @@ -1,18 +1,20 @@ use std::{collections::HashSet, sync::Arc, time::Duration}; +use futures::future::join_all; +use itertools::Itertools; use mdns_sd::{ServiceDaemon, ServiceEvent}; use paris::{error, info, warn}; use tokio::{ net::UdpSocket, - sync::{watch, Mutex, OnceCell}, + sync::{watch, Mutex, OnceCell, RwLock}, }; use super::BoardInfo; #[derive(Debug, Clone)] pub struct UdpRpc { - socket: Arc>, - boards: Arc>>, + socket: Arc, + boards: Arc>>, boards_change_sender: Arc>>>, boards_change_receiver: Arc>>>, } @@ -32,8 +34,8 @@ impl UdpRpc { async fn new() -> anyhow::Result { let socket = UdpSocket::bind("0.0.0.0:0").await?; - let socket = Arc::new(Mutex::new(socket)); - let boards = Arc::new(Mutex::new(HashSet::new())); + let socket = Arc::new(socket); + let boards = Arc::new(RwLock::new(HashSet::new())); let (boards_change_sender, boards_change_receiver) = watch::channel(HashSet::new()); let boards_change_sender = Arc::new(Mutex::new(boards_change_sender)); let boards_change_receiver = Arc::new(Mutex::new(boards_change_receiver)); @@ -84,7 +86,7 @@ impl UdpRpc { ); let shared_self = shared_self.lock().await; - let mut boards = shared_self.boards.clone().lock_owned().await; + let mut boards = shared_self.boards.write().await; let board = BoardInfo { name: info.get_fullname().to_string(), @@ -116,7 +118,30 @@ impl UdpRpc { } pub async fn get_boards(&self) -> HashSet { - let boards = self.boards.clone().lock_owned().await; + let boards = self.boards.read().await; boards.clone() } + + pub async fn send_to_all(&self, buff: &Vec) -> anyhow::Result<()> { + let boards = self.get_boards().await; + let socket = self.socket.clone(); + + let handlers = boards.into_iter() + .map(|board| { + let socket = socket.clone(); + let buff = buff.clone(); + tokio::spawn(async move { + match socket.send_to(&buff, (board.address, board.port)).await { + Ok(_) => {}, + Err(err) => { + error!("failed to send to {}: {:?}", board.name, err); + }, + } + }) + }); + + join_all(handlers).await; + + Ok(()) + } }