use std::time::Duration; use anyhow::anyhow; use anyhow::bail; use anyhow::Result; use http_body_util::BodyExt; use http_body_util::Empty; use hyper::StatusCode; use hyper::{ body::{Buf, Bytes}, Request, }; use hyper_util::rt::TokioIo; use log::debug; use serde::{Deserialize, Serialize}; use tokio::net::TcpStream; use tokio::time::sleep; use crate::udp_server; pub type OpenWRTIFaces = Vec; pub async fn poll_wan_traffic(luci_url: &str, username: &str, password: &str) -> Result<()> { // Parse our URL... let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::()?; // Get the host and the port let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let address = format!("{}:{}", host, port); let mut cookies_str = String::new(); // Open a TCP connection to the remote host loop { let stream = TcpStream::connect(address.clone()).await?; // Use an adapter to access something implementing `tokio::io` traits as if they implement // `hyper::rt` IO traits. let io = TokioIo::new(stream); // Create the Hyper client let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; // Spawn a task to poll the connection, driving the HTTP state tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); } }); let mut prev_up = 0; let mut prev_down = 0; loop { if sender.is_closed() { break; } if !sender.is_ready() { sleep(Duration::from_millis(100)).await; continue; } let poll_req = generate_poll_wan_req(luci_url, &cookies_str)?; // Await the response... let res = sender.send_request(poll_req.clone()).await?; debug!("Response status: {}", res.status()); match res.status() { StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => { cookies_str = login(luci_url, username, password).await?; continue; } _ => {} } // asynchronously aggregate the chunks of the body let body = res.collect().await?.aggregate(); // try to parse as json with serde_json let interfaces: OpenWRTIFaces = serde_json::from_reader(body.reader())?; let wan = interfaces.first().unwrap(); if prev_up > wan.tx_bytes || prev_down > wan.rx_bytes { prev_up = wan.tx_bytes; prev_down = wan.rx_bytes; } let up_speed = (wan.tx_bytes - prev_up) as u64; let down_speed = (wan.rx_bytes - prev_down) as u64; prev_up = wan.tx_bytes; prev_down = wan.rx_bytes; let udp_server = udp_server::UdpServer::global().await; let mut buf = [0; 16]; buf[0..8].copy_from_slice(&up_speed.to_le_bytes()); buf[8..16].copy_from_slice(&down_speed.to_le_bytes()); udp_server.publish_wan(&buf).await; // println!("speed: ↑ {}, ↓ {}", up_speed, down_speed); sleep(Duration::from_secs(1)).await; } } } async fn login(luci_url: &str, username: &str, password: &str) -> Result { let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::()?; let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let address = format!("{}:{}", host, port); let stream = TcpStream::connect(address).await?; let io = TokioIo::new(stream); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); } }); let login_req = generate_login_req(luci_url, username, password)?; let res = sender.send_request(login_req.clone()).await?; if res.status() == StatusCode::FORBIDDEN { bail!("Login failed, got status: {}", res.status()); } let cookies = res.headers().get_all(hyper::header::SET_COOKIE); let cookies = cookies .iter() .map(|cookie| cookie.to_str().unwrap().split(';').next()); let cookies = cookies.filter_map(|cookie| cookie); let cookies_str = cookies.collect::>().join("; "); Ok(cookies_str) } fn generate_poll_wan_req(luci_url: &str, cookie: &str) -> Result>> { let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::()?; let authority = url.authority().unwrap().clone(); let target = url.path_and_query().unwrap().clone(); debug!("Polling WAN traffic. Cookie: {:?}", cookie); return Request::builder() .uri(target) .header(hyper::header::HOST, authority.as_str()) .header(hyper::header::CONNECTION, "Keep-Alive") .header(hyper::header::COOKIE, cookie) .body(Empty::::new()) .map_err(|e| anyhow!(e)); } fn generate_login_req(luci_url: &str, username: &str, password: &str) -> Result> { let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::()?; let authority = url.authority().unwrap().clone(); let target = url.path_and_query().unwrap().clone(); let login_params = format!("luci_username={}&luci_password={}", username, password); let body = login_params; return Request::builder() .uri(target) .method("POST") .header(hyper::header::HOST, authority.as_str()) .header( hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) .header(hyper::header::CONNECTION, "Keep-Alive") .body(body) .map_err(|e| anyhow!(e)); } #[derive(Serialize, Deserialize)] pub struct OpenWRTIFace { rx_bytes: u64, tx_bytes: u64, } impl Default for OpenWRTIFace { fn default() -> Self { Self { rx_bytes: 0, tx_bytes: 0, } } }