From c44018e6334fe26e9695bbe107bcbac8914e1cf6 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 14 May 2023 11:55:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87=20?= =?UTF-8?q?udp=20=E8=BD=AC=E5=8F=91=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 78 ++++++++++++++++++++++++++++++ Cargo.toml | 3 ++ src/main.rs | 39 ++++++++------- src/statistics.rs | 4 +- src/udp_server.rs | 119 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 226 insertions(+), 17 deletions(-) create mode 100644 src/udp_server.rs diff --git a/Cargo.lock b/Cargo.lock index 5597889..4d6ac80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "0.3.2" @@ -51,6 +60,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "anyhow" +version = "1.0.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" + [[package]] name = "autocfg" version = "1.1.0" @@ -179,6 +194,19 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "errno" version = "0.3.1" @@ -318,6 +346,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "idna" version = "0.3.0" @@ -388,6 +422,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + [[package]] name = "mio" version = "0.8.6" @@ -404,8 +444,11 @@ dependencies = [ name = "network-monitor" version = "0.1.0" dependencies = [ + "anyhow", "clap", + "env_logger", "futures-util", + "log", "serde", "serde_json", "tokio", @@ -533,6 +576,23 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" + [[package]] name = "rustix" version = "0.37.19" @@ -652,6 +712,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "termcolor" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.40" @@ -826,6 +895,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 2231417..9d58be6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,11 @@ version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.71" clap = {version = "4.2.7", features = ["derive"]} +env_logger = "0.10.0" futures-util = "0.3.28" +log = "0.4.17" serde = { version = "1.0.163", features = ["derive"] } serde_json = "1.0.96" tokio = {version = "1.11.0", features = ["full"]} diff --git a/src/main.rs b/src/main.rs index 287366d..89ad247 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use tokio_tungstenite::connect_async; mod clash_conn_msg; mod statistics; +mod udp_server; #[derive(Parser)] #[clap( @@ -27,6 +28,8 @@ struct Args { #[tokio::main] async fn main() { + env_logger::init(); + println!("Hello, world!"); let args = Args::parse(); @@ -61,22 +64,26 @@ async fn main() { let state = statistics_manager.get_state().await; - stdout() - .write_all( - format!( - "len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}\n", - state.connections, - state.speed_upload, - state.speed_download, - state.direct_upload_speed, - state.direct_download_speed, - state.proxy_upload_speed, - state.proxy_download_speed, - ) - .as_bytes(), - ) - .await - .unwrap(); + // stdout() + // .write_all( + // format!( + // "len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}\n", + // state.connections, + // state.speed_upload, + // state.speed_download, + // state.direct_upload_speed, + // state.direct_download_speed, + // state.proxy_upload_speed, + // state.proxy_download_speed, + // ) + // .as_bytes(), + // ) + // .await + // .unwrap(); + + let udp_server = udp_server::UdpServer::global().await; + let buf = serde_json::to_vec(&state).unwrap(); + udp_server.publish(&buf).await; }) }; diff --git a/src/statistics.rs b/src/statistics.rs index f69a7c3..d9573a2 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -1,10 +1,11 @@ use std::{collections::HashMap, sync::Arc}; +use serde::Serialize; use tokio::sync::{Mutex, OnceCell}; use crate::clash_conn_msg::{ClashConnectionMessage, ClashConnectionsWrapper}; -#[derive(Clone)] +#[derive(Clone, Serialize)] pub struct StatisticsState { pub connections: usize, pub download_total: u64, @@ -15,6 +16,7 @@ pub struct StatisticsState { pub direct_download_speed: u64, pub proxy_upload_speed: u64, pub proxy_download_speed: u64, + #[serde(skip)] connection_map: Arc>>, } diff --git a/src/udp_server.rs b/src/udp_server.rs new file mode 100644 index 0000000..639af4c --- /dev/null +++ b/src/udp_server.rs @@ -0,0 +1,119 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr}, + sync::Arc, +}; + +use log::{error, info}; +use tokio::{ + net::UdpSocket, + sync::{Mutex, OnceCell}, + time::Instant, +}; +use clap::Parser; + +use crate::Args; + +struct Client { + last_seen: Instant, + socket: UdpSocket, +} + +#[derive(Clone)] +pub struct UdpServer { + listen_addr: Ipv4Addr, + listen_port: u16, + clients: Arc>>, +} + +impl UdpServer { + pub async fn global() -> &'static Self { + static UDP_SERVER: OnceCell = OnceCell::const_new(); + + UDP_SERVER + .get_or_init(|| async { + let args = Args::parse(); + let listen_addr = Ipv4Addr::new(0, 0, 0, 0); + let listen_port = args.listen_port; + + let udp_server = UdpServer::new(listen_addr, listen_port); + let server = udp_server.clone(); + tokio::spawn(async move { + if let Err(err) = server.run().await { + error!("Failed to run UDP server: {}", err); + } + }); + + udp_server + }) + .await + } + + pub fn new(listen_addr: Ipv4Addr, listen_port: u16) -> Self { + Self { + listen_addr, + listen_port, + clients: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn run(&self) -> anyhow::Result<()> { + let mut buf = [0; 1024]; + let socket = UdpSocket::bind((self.listen_addr, self.listen_port)).await?; + + loop { + info!("Waiting for data"); + let (len, addr) = socket.recv_from(&mut buf).await?; + + info!("Received data({}) from {}", len, addr); + + let mut clients = self.clients.lock().await; + + let client = clients.get_mut(&addr); + + if client.is_none() { + let socket = UdpSocket::bind((self.listen_addr, 0)).await; + if let Err(err) = socket { + error!("Failed to bind socket: {}", err); + continue; + } + let socket = socket.unwrap(); + if let Err(err) = socket.connect(addr).await { + error!("Failed to connect socket: {}", err); + continue; + } + + let client = Client { + socket, + last_seen: Instant::now(), + }; + + clients.insert(addr, client); + } else { + let client = client.unwrap(); + client.last_seen = Instant::now(); + } + } + } + + pub async fn publish(&self, buf: &[u8]) { + let mut to_remove = Vec::new(); + let mut clients = self.clients.lock().await; + + for (addr, client) in clients.iter() { + if client.last_seen.elapsed().as_secs() > 10 { + to_remove.push(addr.clone()); + } else { + if let Err(err) = client.socket.send(buf).await { + error!("Failed to send data to {}: {}", addr, err); + } else { + info!("Sent data to {}", addr); + } + } + } + + for addr in to_remove { + clients.remove(&addr); + } + } +}