pref: 调整发送数据的逻辑,改善丢包问题。

This commit is contained in:
2023-05-04 21:56:56 +08:00
parent ca9a2ba34d
commit 174840403f
6 changed files with 170 additions and 97 deletions

View File

@ -8,12 +8,11 @@ use tokio::{
sync::{watch, OnceCell, RwLock},
};
use super::{BoardInfo, BoardConnectStatus};
use super::{Board, BoardInfo};
#[derive(Debug, Clone)]
pub struct UdpRpc {
socket: Arc<UdpSocket>,
boards: Arc<RwLock<HashMap<String, BoardInfo>>>,
boards: Arc<RwLock<HashMap<String, Board>>>,
boards_change_sender: Arc<watch::Sender<Vec<BoardInfo>>>,
}
@ -31,14 +30,11 @@ impl UdpRpc {
}
async fn new() -> anyhow::Result<Self> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
let socket = Arc::new(socket);
let boards = Arc::new(RwLock::new(HashMap::new()));
let (boards_change_sender, _) = watch::channel(Vec::new());
let boards_change_sender = Arc::new(boards_change_sender);
Ok(Self {
socket,
boards,
boards_change_sender,
})
@ -91,40 +87,55 @@ impl UdpRpc {
let mut boards = self.boards.write().await;
let board = BoardInfo::new(
info.get_fullname().to_string(),
let board_info = BoardInfo::new(
info.get_fullname().to_string(),
info.get_hostname().to_string(),
info.get_addresses().iter().next().unwrap().clone(),
info.get_port(),
);
if boards.insert(board.fullname.clone(), board.clone()).is_some() {
info!("added board {:?}", board);
let mut board = Board::new(board_info.clone());
if let Err(err) = board.init_socket().await {
error!("failed to init socket: {:?}", err);
continue;
}
let tx_boards = boards.values().cloned().collect();
if boards.insert(board_info.fullname.clone(), board).is_some() {
info!("added board {:?}", board_info);
}
let tx_boards = boards
.values()
.map(|it| async move { it.info.read().await.clone() });
let tx_boards = join_all(tx_boards).await;
drop(boards);
sender.send(tx_boards)?;
tokio::task::yield_now().await;
}
ServiceEvent::ServiceRemoved(_, fullname) => {
info!("removed board {:?}", fullname);
info!("removed board {:?}", fullname);
let mut boards = self.boards.write().await;
if boards.remove(&fullname).is_some() {
info!("removed board {:?} successful", fullname);
}
let tx_boards = boards.values().cloned().collect();
let tx_boards = boards
.values()
.map(|it| async move { it.info.read().await.clone() });
let tx_boards = join_all(tx_boards).await;
drop(boards);
sender.send(tx_boards)?;
tokio::task::yield_now().await;
}
other_event => {
// log::info!("{:?}", &other_event);
}
}
tokio::task::yield_now().await;
}
Ok(())
@ -135,34 +146,38 @@ impl UdpRpc {
}
pub async fn get_boards(&self) -> Vec<BoardInfo> {
let boards = self.boards.read().await;
boards.values().cloned().collect()
self.boards_change_sender.borrow().clone()
}
pub async fn send_to_all(&self, buff: &Vec<u8>) -> anyhow::Result<()> {
let boards = self.get_boards().await;
let socket = self.socket.clone();
let boards = self.boards.read().await;
let handlers = boards.into_iter().map(|board| {
if board.connect_status == BoardConnectStatus::Disconnected {
return tokio::spawn(async move {
log::debug!("board {} is disconnected, skip.", board.host);
});
}
for board in boards.values() {
board.send_colors(buff).await;
}
let socket = socket.clone();
let buff = buff.clone();
tokio::spawn(async move {
match socket.send_to(&buff, (board.address, board.port)).await {
Ok(_) => {}
Err(err) => {
error!("failed to send to {}: {:?}", board.host, err);
}
}
})
});
// let socket = self.socket.clone();
join_all(handlers).await;
// let handlers = boards.into_iter().map(|board| {
// if board.connect_status == BoardConnectStatus::Disconnected {
// return tokio::spawn(async move {
// log::debug!("board {} is disconnected, skip.", board.host);
// });
// }
// let socket = socket.clone();
// let buff = buff.clone();
// tokio::spawn(async move {
// match socket.send_to(&buff, (board.address, board.port)).await {
// Ok(_) => {}
// Err(err) => {
// error!("failed to send to {}: {:?}", board.host, err);
// }
// }
// })
// });
// join_all(handlers).await;
Ok(())
}
@ -170,28 +185,35 @@ impl UdpRpc {
pub async fn check_boards(&self) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
let mut boards = self.boards.clone().write_owned().await;
tokio::task::yield_now().await;
interval.tick().await;
let boards = self.boards.read().await;
if boards.is_empty() {
info!("no boards found");
continue;
}
for (_, board) in boards.iter_mut() {
for board in boards.values() {
if let Err(err) = board.check().await {
error!("failed to check board {}: {:?}", board.host, err);
error!("failed to check board: {:?}", err);
}
}
let board_vec = boards.values().cloned().collect::<Vec<_>>();
let tx_boards = boards
.values()
.map(|it| async move { it.info.read().await.clone() });
let tx_boards = join_all(tx_boards).await;
drop(boards);
let board_change_sender = self.boards_change_sender.clone();
if let Err(err) = board_change_sender.send(board_vec) {
if let Err(err) = board_change_sender.send(tx_boards) {
error!("failed to send board change: {:?}", err);
}
drop(board_change_sender);
interval.tick().await;
}
}
}