diff --git a/src-tauri/src/rpc/board.rs b/src-tauri/src/rpc/board.rs new file mode 100644 index 0000000..c64bf1c --- /dev/null +++ b/src-tauri/src/rpc/board.rs @@ -0,0 +1,101 @@ +use std::time::Duration; + +use paris::{info, warn}; +use tokio::{net::UdpSocket, sync::RwLock, time::timeout}; + +use super::{BoardConnectStatus, BoardInfo}; + +#[derive(Debug)] +pub struct Board { + pub info: RwLock, + socket: Option, +} + +impl Board { + pub fn new(info: BoardInfo) -> Self { + Self { + info: RwLock::new(info), + socket: None, + } + } + + pub async fn init_socket(&mut self) -> anyhow::Result<()> { + let info = self.info.read().await; + let socket = UdpSocket::bind("0.0.0.0:0").await?; + + socket.connect((info.address, info.port)).await?; + self.socket = Some(socket); + + Ok(()) + } + + pub async fn send_colors(&self, buf: &[u8]) { + let info = self.info.read().await; + if self.socket.is_none() || info.connect_status != BoardConnectStatus::Connected { + return; + } + + let socket = self.socket.as_ref().unwrap(); + + socket + .send(buf) + .await + .unwrap(); + } + + pub async fn check(&self) -> anyhow::Result<()> { + let info = self.info.read().await; + let socket = UdpSocket::bind("0.0.0.0:0").await?; + socket.connect((info.address, info.port)).await?; + drop(info); + + let instant = std::time::Instant::now(); + + socket.send(&[1]).await?; + let mut buf = [0u8; 1]; + let recv_future = socket.recv(&mut buf); + + let check_result = timeout(Duration::from_secs(1), recv_future).await; + let mut info = self.info.write().await; + match check_result { + Ok(_) => { + let ttl = instant.elapsed(); + if buf == [1] { + info.connect_status = BoardConnectStatus::Connected; + } else { + if let BoardConnectStatus::Connecting(retry) = info.connect_status { + if retry < 10 { + info.connect_status = BoardConnectStatus::Connecting(retry + 1); + info!("reconnect: {}", retry + 1); + } else { + info.connect_status = BoardConnectStatus::Disconnected; + warn!("board Disconnected: bad pong."); + } + } else if info.connect_status != BoardConnectStatus::Disconnected { + info.connect_status = BoardConnectStatus::Connecting(1); + } + } + info.ttl = Some(ttl.as_millis()); + } + Err(_) => { + if let BoardConnectStatus::Connecting(retry) = info.connect_status { + if retry < 10 { + info.connect_status = BoardConnectStatus::Connecting(retry + 1); + info!("reconnect: {}", retry + 1); + } else { + info.connect_status = BoardConnectStatus::Disconnected; + warn!("board Disconnected: timeout"); + } + } else if info.connect_status != BoardConnectStatus::Disconnected { + info.connect_status = BoardConnectStatus::Connecting(1); + } + + info.ttl = None; + } + } + + info.checked_at = Some(std::time::SystemTime::now()); + + Ok(()) + } +} diff --git a/src-tauri/src/rpc/board_info.rs b/src-tauri/src/rpc/board_info.rs index bb3a290..d00df43 100644 --- a/src-tauri/src/rpc/board_info.rs +++ b/src-tauri/src/rpc/board_info.rs @@ -1,8 +1,6 @@ use std::{net::Ipv4Addr, time::Duration}; -use paris::{warn, info}; use serde::{Deserialize, Serialize}; -use tokio::{net::UdpSocket, time::timeout}; #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] pub enum BoardConnectStatus { @@ -35,56 +33,4 @@ impl BoardInfo { ttl: None, } } - - pub async fn check(&mut self) -> anyhow::Result<()> { - let socket = UdpSocket::bind("0.0.0.0:0").await?; - socket.connect((self.address, self.port)).await?; - - let instant = std::time::Instant::now(); - - socket.send(&[1]).await?; - let mut buf = [0u8; 1]; - let recv_future = socket.recv(&mut buf); - - match timeout(Duration::from_secs(1), recv_future).await { - Ok(_) => { - let ttl = instant.elapsed(); - if buf == [1] { - self.connect_status = BoardConnectStatus::Connected; - } else { - if let BoardConnectStatus::Connecting(retry) = self.connect_status { - if retry < 10 { - self.connect_status = BoardConnectStatus::Connecting(retry + 1); - info!("reconnect: {}", retry + 1); - } else { - self.connect_status = BoardConnectStatus::Disconnected; - warn!("board Disconnected: bad pong."); - } - } else if self.connect_status != BoardConnectStatus::Disconnected { - self.connect_status = BoardConnectStatus::Connecting(1); - } - } - self.ttl = Some(ttl.as_millis()); - } - Err(_) => { - if let BoardConnectStatus::Connecting(retry) = self.connect_status { - if retry < 10 { - self.connect_status = BoardConnectStatus::Connecting(retry + 1); - info!("reconnect: {}", retry + 1); - } else { - self.connect_status = BoardConnectStatus::Disconnected; - warn!("board Disconnected: timeout"); - } - } else if self.connect_status != BoardConnectStatus::Disconnected { - self.connect_status = BoardConnectStatus::Connecting(1); - } - - self.ttl = None; - } - } - - self.checked_at = Some(std::time::SystemTime::now()); - - Ok(()) - } } diff --git a/src-tauri/src/rpc/mod.rs b/src-tauri/src/rpc/mod.rs index b82ffdd..b2f3cd5 100644 --- a/src-tauri/src/rpc/mod.rs +++ b/src-tauri/src/rpc/mod.rs @@ -1,7 +1,9 @@ mod board_info; mod mqtt; mod udp; +mod board; pub use board_info::*; pub use mqtt::*; pub use udp::*; +pub use board::*; diff --git a/src-tauri/src/rpc/udp.rs b/src-tauri/src/rpc/udp.rs index ad6dfc6..3d6f5b7 100644 --- a/src-tauri/src/rpc/udp.rs +++ b/src-tauri/src/rpc/udp.rs @@ -8,12 +8,11 @@ use tokio::{ sync::{watch, OnceCell, RwLock}, }; -use super::{BoardInfo, BoardConnectStatus}; +use super::{Board, BoardInfo}; #[derive(Debug, Clone)] pub struct UdpRpc { - socket: Arc, - boards: Arc>>, + boards: Arc>>, boards_change_sender: Arc>>, } @@ -31,14 +30,11 @@ impl UdpRpc { } async fn new() -> anyhow::Result { - let socket = UdpSocket::bind("0.0.0.0:0").await?; - let socket = Arc::new(socket); let boards = Arc::new(RwLock::new(HashMap::new())); let (boards_change_sender, _) = watch::channel(Vec::new()); let boards_change_sender = Arc::new(boards_change_sender); Ok(Self { - socket, boards, boards_change_sender, }) @@ -91,40 +87,55 @@ impl UdpRpc { let mut boards = self.boards.write().await; - let board = BoardInfo::new( - info.get_fullname().to_string(), + let board_info = BoardInfo::new( info.get_fullname().to_string(), + info.get_hostname().to_string(), info.get_addresses().iter().next().unwrap().clone(), info.get_port(), ); - if boards.insert(board.fullname.clone(), board.clone()).is_some() { - info!("added board {:?}", board); + let mut board = Board::new(board_info.clone()); + + if let Err(err) = board.init_socket().await { + error!("failed to init socket: {:?}", err); + continue; } - let tx_boards = boards.values().cloned().collect(); + if boards.insert(board_info.fullname.clone(), board).is_some() { + info!("added board {:?}", board_info); + } + + let tx_boards = boards + .values() + .map(|it| async move { it.info.read().await.clone() }); + let tx_boards = join_all(tx_boards).await; + drop(boards); sender.send(tx_boards)?; - tokio::task::yield_now().await; } ServiceEvent::ServiceRemoved(_, fullname) => { - info!("removed board {:?}", fullname); + info!("removed board {:?}", fullname); let mut boards = self.boards.write().await; if boards.remove(&fullname).is_some() { info!("removed board {:?} successful", fullname); } - let tx_boards = boards.values().cloned().collect(); + let tx_boards = boards + .values() + .map(|it| async move { it.info.read().await.clone() }); + let tx_boards = join_all(tx_boards).await; + drop(boards); sender.send(tx_boards)?; - tokio::task::yield_now().await; } other_event => { // log::info!("{:?}", &other_event); } } + + tokio::task::yield_now().await; } Ok(()) @@ -135,34 +146,38 @@ impl UdpRpc { } pub async fn get_boards(&self) -> Vec { - let boards = self.boards.read().await; - boards.values().cloned().collect() + self.boards_change_sender.borrow().clone() } pub async fn send_to_all(&self, buff: &Vec) -> anyhow::Result<()> { - let boards = self.get_boards().await; - let socket = self.socket.clone(); + let boards = self.boards.read().await; - let handlers = boards.into_iter().map(|board| { - if board.connect_status == BoardConnectStatus::Disconnected { - return tokio::spawn(async move { - log::debug!("board {} is disconnected, skip.", board.host); - }); - } + for board in boards.values() { + board.send_colors(buff).await; + } - let socket = socket.clone(); - let buff = buff.clone(); - tokio::spawn(async move { - match socket.send_to(&buff, (board.address, board.port)).await { - Ok(_) => {} - Err(err) => { - error!("failed to send to {}: {:?}", board.host, err); - } - } - }) - }); + // let socket = self.socket.clone(); - join_all(handlers).await; + // let handlers = boards.into_iter().map(|board| { + // if board.connect_status == BoardConnectStatus::Disconnected { + // return tokio::spawn(async move { + // log::debug!("board {} is disconnected, skip.", board.host); + // }); + // } + + // let socket = socket.clone(); + // let buff = buff.clone(); + // tokio::spawn(async move { + // match socket.send_to(&buff, (board.address, board.port)).await { + // Ok(_) => {} + // Err(err) => { + // error!("failed to send to {}: {:?}", board.host, err); + // } + // } + // }) + // }); + + // join_all(handlers).await; Ok(()) } @@ -170,28 +185,35 @@ impl UdpRpc { pub async fn check_boards(&self) { let mut interval = tokio::time::interval(Duration::from_secs(1)); loop { - let mut boards = self.boards.clone().write_owned().await; + tokio::task::yield_now().await; + interval.tick().await; + + let boards = self.boards.read().await; if boards.is_empty() { info!("no boards found"); continue; } - for (_, board) in boards.iter_mut() { + for board in boards.values() { if let Err(err) = board.check().await { - error!("failed to check board {}: {:?}", board.host, err); + error!("failed to check board: {:?}", err); } } - let board_vec = boards.values().cloned().collect::>(); + let tx_boards = boards + .values() + .map(|it| async move { it.info.read().await.clone() }); + let tx_boards = join_all(tx_boards).await; + drop(boards); let board_change_sender = self.boards_change_sender.clone(); - if let Err(err) = board_change_sender.send(board_vec) { + if let Err(err) = board_change_sender.send(tx_boards) { error!("failed to send board change: {:?}", err); } + drop(board_change_sender); - interval.tick().await; } } } diff --git a/src/components/info/board-info-panel.tsx b/src/components/info/board-info-panel.tsx index d3b5a7e..a153d3c 100644 --- a/src/components/info/board-info-panel.tsx +++ b/src/components/info/board-info-panel.tsx @@ -43,6 +43,7 @@ export const BoardInfoPanel: Component<{ board: BoardInfo }> = (props) => { return (
+ {props.board.fullname} {props.board.host} {props.board.address} diff --git a/src/models/board-info.model.ts b/src/models/board-info.model.ts index e43e186..7ef8534 100644 --- a/src/models/board-info.model.ts +++ b/src/models/board-info.model.ts @@ -1,4 +1,5 @@ export type BoardInfo = { + fullname: string; host: string; address: string; port: number;