use std::time::Duration; use anyhow::{anyhow, bail}; use clap::Parser; use futures_util::StreamExt; use log::{debug, error}; use tokio::time::sleep; use tokio_tungstenite::{ connect_async, tungstenite::{protocol::{frame::coding::CloseCode, CloseFrame}, Message}, }; mod clash_conn_msg; mod statistics; mod udp_server; mod wan; use wan::poll_wan_traffic; #[derive(Parser, Debug)] #[clap( version = "0.1.0", author = "Ivan Li", about = "Watch Clash network traffic" )] struct Args { #[clap( short, env, long, default_value = "ws://192.168.1.1:9090/connections?token=123456" )] clash_url: String, #[clap(short = 'p', long, env, default_value = "17890")] listen_port: u16, #[clap(short, long, env, default_value = "http://192.168.1.1/cgi-bin/luci")] luci_url: String, #[clap(short = 'u', long, env, default_value = "root")] luci_username: String, #[clap(short = 'P', long, env, default_value = "123456")] luci_password: String, } #[tokio::main] async fn main() { dotenvy::dotenv().ok(); env_logger::init(); println!("Hello, world!"); let args = Args::parse(); tokio::spawn(async move { loop { if let Err(err) = poll_wan_traffic( args.luci_url.as_str(), args.luci_username.as_str(), args.luci_password.as_str(), ) .await { error!("error: {}", err); } sleep(Duration::from_secs(1)).await; error!("restart poll_wan_traffic!"); } }); let connect_addr = args.clash_url; loop { if let Err(err) = pipe(connect_addr.clone()).await { error!("{}", err); }; sleep(Duration::from_secs(1)).await; error!("restart clash!"); } } async fn pipe(connect_addr: String) -> anyhow::Result<()> { let url = url::Url::parse(&connect_addr).map_err(|err| anyhow::anyhow!(err))?; let (mut ws_stream, _) = connect_async(url) .await .map_err(|err| anyhow::anyhow!(err))?; println!("WebSocket handshake has been successfully completed"); while let Some(message) = ws_stream.next().await { let message = message?; match message { Message::Text(text) => { let data = text.into_bytes(); let wrapper = serde_json::from_slice::(&data); if let Err(err) = wrapper { error!("parse message failed. {}", err); ws_stream .close(Some(CloseFrame { code: CloseCode::Unsupported, reason: "parse message failed".into(), })).await .map_err(|err| anyhow!(err))?; bail!(err); } let wrapper = wrapper.unwrap(); let statistics_manager = statistics::StatisticsManager::global().await; statistics_manager.update(&wrapper).await; let state = statistics_manager.get_state().await; debug!("len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}", state.connections, state.speed_upload, state.speed_download, state.direct_upload_speed, state.direct_download_speed, state.proxy_upload_speed, state.proxy_download_speed, ); let udp_server = udp_server::UdpServer::global().await; let mut buf = [0; 32]; buf[0..8].copy_from_slice(&state.direct_upload_speed.to_le_bytes()); buf[8..16].copy_from_slice(&state.direct_download_speed.to_le_bytes()); buf[16..24].copy_from_slice(&state.proxy_upload_speed.to_le_bytes()); buf[24..32].copy_from_slice(&state.proxy_download_speed.to_le_bytes()); udp_server.publish_clash(&buf).await; } Message::Close(_) => { println!("Server requested close"); break; } _ => {} } } Ok(()) }