119 lines
3.2 KiB
Rust
119 lines
3.2 KiB
Rust
use std::{
|
|
collections::HashMap,
|
|
net::{Ipv4Addr, SocketAddr},
|
|
sync::Arc,
|
|
};
|
|
|
|
use log::{error, info};
|
|
use tokio::{
|
|
net::UdpSocket,
|
|
sync::{Mutex, OnceCell},
|
|
time::Instant,
|
|
};
|
|
use clap::Parser;
|
|
|
|
use crate::Args;
|
|
|
|
struct Client {
|
|
last_seen: Instant,
|
|
socket: UdpSocket,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct UdpServer {
|
|
listen_addr: Ipv4Addr,
|
|
listen_port: u16,
|
|
clients: Arc<Mutex<HashMap<SocketAddr, Client>>>,
|
|
}
|
|
|
|
impl UdpServer {
|
|
pub async fn global() -> &'static Self {
|
|
static UDP_SERVER: OnceCell<UdpServer> = OnceCell::const_new();
|
|
|
|
UDP_SERVER
|
|
.get_or_init(|| async {
|
|
let args = Args::parse();
|
|
let listen_addr = Ipv4Addr::new(0, 0, 0, 0);
|
|
let listen_port = args.listen_port;
|
|
|
|
let udp_server = UdpServer::new(listen_addr, listen_port);
|
|
let server = udp_server.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(err) = server.run().await {
|
|
error!("Failed to run UDP server: {}", err);
|
|
}
|
|
});
|
|
|
|
udp_server
|
|
})
|
|
.await
|
|
}
|
|
|
|
pub fn new(listen_addr: Ipv4Addr, listen_port: u16) -> Self {
|
|
Self {
|
|
listen_addr,
|
|
listen_port,
|
|
clients: Arc::new(Mutex::new(HashMap::new())),
|
|
}
|
|
}
|
|
|
|
pub async fn run(&self) -> anyhow::Result<()> {
|
|
let mut buf = [0; 1024];
|
|
let socket = UdpSocket::bind((self.listen_addr, self.listen_port)).await?;
|
|
|
|
loop {
|
|
let (len, addr) = socket.recv_from(&mut buf).await?;
|
|
|
|
info!("Received data({}) from {}", len, addr);
|
|
|
|
let mut clients = self.clients.lock().await;
|
|
|
|
let client = clients.get_mut(&addr);
|
|
|
|
if client.is_none() {
|
|
let socket = UdpSocket::bind((self.listen_addr, 0)).await;
|
|
if let Err(err) = socket {
|
|
error!("Failed to bind socket: {}", err);
|
|
continue;
|
|
}
|
|
let socket = socket.unwrap();
|
|
if let Err(err) = socket.connect(addr).await {
|
|
error!("Failed to connect socket: {}", err);
|
|
continue;
|
|
}
|
|
|
|
let client = Client {
|
|
socket,
|
|
last_seen: Instant::now(),
|
|
};
|
|
|
|
clients.insert(addr, client);
|
|
} else {
|
|
let client = client.unwrap();
|
|
client.last_seen = Instant::now();
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn publish(&self, buf: &[u8]) {
|
|
let mut to_remove = Vec::new();
|
|
let mut clients = self.clients.lock().await;
|
|
|
|
for (addr, client) in clients.iter() {
|
|
if client.last_seen.elapsed().as_secs() > 10 {
|
|
to_remove.push(addr.clone());
|
|
} else {
|
|
if let Err(err) = client.socket.send(buf).await {
|
|
error!("Failed to send data to {}: {}", addr, err);
|
|
} else {
|
|
info!("Sent data to {}", addr);
|
|
}
|
|
}
|
|
}
|
|
|
|
for addr in to_remove {
|
|
clients.remove(&addr);
|
|
}
|
|
}
|
|
}
|