From 6c90a5e655128c7b3d8681905356028ecc0b526f Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 30 Apr 2023 18:44:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E5=92=8C=E6=9F=A5=E7=9C=8B=E6=9D=BF=E5=AD=90=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E7=9A=84=E6=83=85=E5=86=B5=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/src/main.rs | 3 +- src-tauri/src/rpc/board_info.rs | 57 ++++++++++- src-tauri/src/rpc/udp.rs | 121 +++++++++++++++-------- src/components/info/board-index.tsx | 16 +-- src/components/info/board-info-panel.tsx | 49 +++++++++ src/models/board-info.model.ts | 5 +- 6 files changed, 193 insertions(+), 58 deletions(-) create mode 100644 src/components/info/board-info-panel.tsx diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 8b78ed5..aad2db6 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -419,12 +419,13 @@ async fn main() { loop { match UdpRpc::global().await { Ok(udp_rpc) => { - let mut receiver = udp_rpc.clone_boards_change_receiver().await; + let mut receiver = udp_rpc.subscribe_boards_change(); loop { if let Err(err) = receiver.changed().await { error!("boards change receiver changed error: {}", err); return; } + log::info!("boards changed"); let boards = receiver.borrow().clone(); diff --git a/src-tauri/src/rpc/board_info.rs b/src-tauri/src/rpc/board_info.rs index a8ad36c..f2ec2f1 100644 --- a/src-tauri/src/rpc/board_info.rs +++ b/src-tauri/src/rpc/board_info.rs @@ -1,10 +1,61 @@ -use std::net::{Ipv4Addr}; +use std::{net::Ipv4Addr, time::Duration}; +use paris::warn; use serde::{Deserialize, Serialize}; +use tokio::{net::UdpSocket, time::timeout}; #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct BoardInfo { - pub name: String, + pub host: String, pub address: Ipv4Addr, pub port: u16, -} \ No newline at end of file + pub is_online: bool, + pub checked_at: Option, + pub ttl: Option, +} + +impl BoardInfo { + pub fn new(host: String, address: Ipv4Addr, port: u16) -> Self { + Self { + host, + address, + port, + is_online: false, + checked_at: None, + ttl: None, + } + } + + pub async fn check(&mut self) -> anyhow::Result<()> { + let socket = UdpSocket::bind("0.0.0.0:0").await?; + socket.connect((self.address, self.port)).await?; + + let instant = std::time::Instant::now(); + + socket.send(&[1]).await?; + let mut buf = [0u8; 1]; + let recv_future = socket.recv(&mut buf); + + match timeout(Duration::from_secs(5), recv_future).await { + Ok(_) => { + let ttl = instant.elapsed(); + log::info!("buf: {:?}", buf); + if buf == [1] { + self.is_online = true; + } else { + self.is_online = false; + } + self.ttl = Some(ttl.as_millis()); + } + Err(_) => { + warn!("timeout"); + self.is_online = false; + self.ttl = None; + } + } + + self.checked_at = Some(std::time::SystemTime::now()); + + Ok(()) + } +} diff --git a/src-tauri/src/rpc/udp.rs b/src-tauri/src/rpc/udp.rs index 65174d1..4ed99e2 100644 --- a/src-tauri/src/rpc/udp.rs +++ b/src-tauri/src/rpc/udp.rs @@ -1,12 +1,11 @@ use std::{collections::HashSet, sync::Arc, time::Duration}; use futures::future::join_all; -use itertools::Itertools; use mdns_sd::{ServiceDaemon, ServiceEvent}; use paris::{error, info, warn}; use tokio::{ net::UdpSocket, - sync::{watch, Mutex, OnceCell, RwLock}, + sync::{watch, OnceCell, RwLock}, }; use super::BoardInfo; @@ -15,8 +14,7 @@ use super::BoardInfo; pub struct UdpRpc { socket: Arc, boards: Arc>>, - boards_change_sender: Arc>>>, - boards_change_receiver: Arc>>>, + boards_change_sender: Arc>>, } impl UdpRpc { @@ -36,22 +34,23 @@ impl UdpRpc { let socket = UdpSocket::bind("0.0.0.0:0").await?; let socket = Arc::new(socket); let boards = Arc::new(RwLock::new(HashSet::new())); - let (boards_change_sender, boards_change_receiver) = watch::channel(HashSet::new()); - let boards_change_sender = Arc::new(Mutex::new(boards_change_sender)); - let boards_change_receiver = Arc::new(Mutex::new(boards_change_receiver)); + let (boards_change_sender, _) = watch::channel(Vec::new()); + let boards_change_sender = Arc::new(boards_change_sender); + Ok(Self { socket, boards, boards_change_sender, - boards_change_receiver, }) } async fn initialize(&self) { let shared_self = Arc::new(self.clone()); + + let shared_self_for_search = shared_self.clone(); tokio::spawn(async move { loop { - match shared_self.search_boards().await { + match shared_self_for_search.search_boards().await { Ok(_) => { info!("search_boards finished"); } @@ -62,44 +61,63 @@ impl UdpRpc { } } }); + + let shared_self_for_check = shared_self.clone(); + tokio::spawn(async move { + shared_self_for_check.check_boards().await; + }); + + // let shared_self_for_watch = shared_self.clone(); + // tokio::spawn(async move { + // let mut rx = shared_self_for_watch.clone_boards_change_receiver().await; + + // // let mut rx = sub_tx.subscribe(); + // // drop(sub_tx); + // while rx.changed().await.is_ok() { + // let boards = rx.borrow().clone(); + // info!("boards changed: {:?}", boards); + // } + // }); } async fn search_boards(&self) -> anyhow::Result<()> { let service_type = "_ambient_light._udp.local."; let mdns = ServiceDaemon::new()?; - let shared_self = Arc::new(Mutex::new(self.clone())); let receiver = mdns.browse(&service_type).map_err(|e| { warn!("Failed to browse for {:?}: {:?}", service_type, e); e })?; + let sender = self.boards_change_sender.clone(); while let Ok(event) = receiver.recv() { match event { ServiceEvent::ServiceResolved(info) => { info!( - "Resolved a new service: {} host: {} port: {} IP: {:?} TXT properties: {:?}", - info.get_fullname(), - info.get_hostname(), - info.get_port(), - info.get_addresses(), - info.get_properties(), - ); + "Resolved a new service: {} host: {} port: {} IP: {:?} TXT properties: {:?}", + info.get_fullname(), + info.get_hostname(), + info.get_port(), + info.get_addresses(), + info.get_properties(), + ); - let shared_self = shared_self.lock().await; - let mut boards = shared_self.boards.write().await; + let mut boards = self.boards.write().await; - let board = BoardInfo { - name: info.get_fullname().to_string(), - address: info.get_addresses().iter().next().unwrap().clone(), - port: info.get_port(), - }; + let board = BoardInfo::new( + info.get_fullname().to_string(), + info.get_addresses().iter().next().unwrap().clone(), + info.get_port(), + ); if boards.insert(board.clone()) { info!("added board {:?}", board); } - let sender = self.boards_change_sender.clone().lock_owned().await; - sender.send(boards.clone())?; + let tx_boards = boards.iter().cloned().collect(); + drop(boards); + + sender.send(tx_boards)?; + tokio::task::yield_now().await; } other_event => { warn!("{:?}", &other_event); @@ -110,32 +128,28 @@ impl UdpRpc { Ok(()) } - pub async fn clone_boards_change_receiver( - &self, - ) -> watch::Receiver> { - let boards_change_receiver = self.boards_change_receiver.clone().lock_owned().await; - boards_change_receiver.clone() + pub fn subscribe_boards_change(&self) -> watch::Receiver> { + self.boards_change_sender.subscribe() } - pub async fn get_boards(&self) -> HashSet { - let boards = self.boards.read().await; - boards.clone() + pub async fn get_boards(&self) -> Vec { + let boards: tokio::sync::RwLockReadGuard> = self.boards.read().await; + boards.iter().cloned().collect() } pub async fn send_to_all(&self, buff: &Vec) -> anyhow::Result<()> { let boards = self.get_boards().await; let socket = self.socket.clone(); - - let handlers = boards.into_iter() - .map(|board| { + + let handlers = boards.into_iter().map(|board| { let socket = socket.clone(); let buff = buff.clone(); tokio::spawn(async move { match socket.send_to(&buff, (board.address, board.port)).await { - Ok(_) => {}, + Ok(_) => {} Err(err) => { - error!("failed to send to {}: {:?}", board.name, err); - }, + error!("failed to send to {}: {:?}", board.host, err); + } } }) }); @@ -144,4 +158,31 @@ impl UdpRpc { Ok(()) } + + pub async fn check_boards(&self) { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let mut boards = self.get_boards().await; + + if boards.is_empty() { + info!("no boards found"); + continue; + } + + for board in &mut boards { + if let Err(err) = board.check().await { + error!("failed to check board {}: {:?}", board.host, err); + } + } + + let board_change_sender = self.boards_change_sender.clone(); + if let Err(err) = board_change_sender.send(boards) { + error!("failed to send board change: {:?}", err); + } else { + info!("send"); + } + drop(board_change_sender); + } + } } diff --git a/src/components/info/board-index.tsx b/src/components/info/board-index.tsx index e462cbe..d769114 100644 --- a/src/components/info/board-index.tsx +++ b/src/components/info/board-index.tsx @@ -3,6 +3,7 @@ import { BoardInfo } from '../../models/board-info.model'; import { listen } from '@tauri-apps/api/event'; import debug from 'debug'; import { invoke } from '@tauri-apps/api'; +import { BoardInfoPanel } from './board-info-panel'; const logger = debug('app:components:info:board-index'); @@ -28,19 +29,8 @@ export const BoardIndex: Component = () => {
    {(board, index) => ( -
  1. -
    -
    host
    -
    {board.name}
    -
    -
    -
    Ip Addr
    -
    {board.address}
    -
    -
    -
    Port
    -
    {board.port}
    -
    +
  2. + #{index() + 1} diff --git a/src/components/info/board-info-panel.tsx b/src/components/info/board-info-panel.tsx new file mode 100644 index 0000000..56fb949 --- /dev/null +++ b/src/components/info/board-info-panel.tsx @@ -0,0 +1,49 @@ +import { Component, ParentComponent, createMemo } from 'solid-js'; +import { BoardInfo } from '../../models/board-info.model'; + +type ItemProps = { + label: string; +}; + +const Item: ParentComponent = (props) => { + return ( +
    +
    {props.label}
    +
    {props.children}
    +
    + ); +}; + +export const BoardInfoPanel: Component<{ board: BoardInfo }> = (props) => { + const ttl = createMemo(() => { + if (!props.board.is_online) { + return '--'; + } + + if (props.board.ttl == null) { + return 'timeout'; + } + + return ( + <> + {props.board.ttl.toFixed(0)} ms + + ); + }); + + return ( +
    + {props.board.host} + + {props.board.address} + + + {props.board.port} + + + {props.board.is_online ? 'Online' : 'Offline'} + + {ttl()} +
    + ); +}; diff --git a/src/models/board-info.model.ts b/src/models/board-info.model.ts index 6affb85..6cb261f 100644 --- a/src/models/board-info.model.ts +++ b/src/models/board-info.model.ts @@ -1,5 +1,8 @@ export type BoardInfo = { - name: string; + host: string; address: string; port: number; + ttl: number; + is_online: boolean; + checked_at: Date; };