Compare commits

..

4 Commits
ci ... master

Author SHA1 Message Date
c94243b838 chore: rm .env.
All checks were successful
Gitea Actions Demo / build (push) Successful in 1m1s
2024-05-03 04:49:06 +00:00
bb015bf662 fix: auto reconnection x2. 2024-05-03 04:00:27 +00:00
0bedfb261c fix: auto reconnection. 2024-05-02 05:06:47 +00:00
ca110fd56d feat: adding OpenWRT WAN port speeds. 2024-03-30 17:47:53 +08:00
9 changed files with 924 additions and 238 deletions

View File

@ -3,7 +3,7 @@
{ {
"name": "Rust", "name": "Rust",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/rust:0-1-bullseye", "image": "mcr.microsoft.com/devcontainers/rust:1-bullseye",
"features": { "features": {
"ghcr.io/devcontainers/features/git:1": {}, "ghcr.io/devcontainers/features/git:1": {},
"ghcr.io/devcontainers/features/docker-in-docker:2": {} "ghcr.io/devcontainers/features/docker-in-docker:2": {}
@ -13,8 +13,9 @@
"extensions": [ "extensions": [
"mhutchie.git-graph", "mhutchie.git-graph",
"donjayamanne.githistory", "donjayamanne.githistory",
"GitHub.copilot", "eamodio.gitlens",
"eamodio.gitlens" "rust-lang.rust-analyzer",
"Codeium.codeium"
] ]
} }
} }

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target /target
.env

45
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,45 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug executable 'network-monitor'",
"cargo": {
"args": [
"build",
"--bin=network-monitor",
"--package=network-monitor"
],
"filter": {
"name": "network-monitor",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in executable 'network-monitor'",
"cargo": {
"args": [
"test",
"--no-run",
"--bin=network-monitor",
"--package=network-monitor"
],
"filter": {
"name": "network-monitor",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
}

6
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,6 @@
{
"cSpell.words": [
"luci"
],
"lldb.library": "/Library/Developer/CommandLineTools/Library/PrivateFrameworks/LLDB.framework/Versions/A/LLDB"
}

701
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,13 +6,17 @@ version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1.0.71" anyhow = "1.0.81"
clap = {version = "4.2.7", features = ["derive"]} clap = {version = "4.5.3", features = ["derive", "env"]}
env_logger = "0.10.0" dotenvy = { version = "0.15.7", features = ["clap", "cli"] }
futures-util = "0.3.28" env_logger = "0.11.3"
futures-util = "0.3.30"
http-body-util = "0.1"
hyper = {version = "1.2.0", features = ["full"]}
hyper-util = {version = "0.1", features = ["full"]}
log = "0.4.17" log = "0.4.17"
serde = { version = "1.0.163", features = ["derive"] } serde = {version = "1.0.197", features = ["derive"]}
serde_json = "1.0.96" serde_json = "1.0.114"
tokio = {version = "1.11.0", features = ["full"]} tokio = {version = "1.36.0", features = ["full"]}
tokio-tungstenite = "0.18.0" tokio-tungstenite = "0.21.0"
url = "2.3.1" url = "2.5.0"

View File

@ -1,19 +1,23 @@
use std::{format, time::Duration}; use std::time::Duration;
use anyhow::{anyhow, bail};
use clap::Parser; use clap::Parser;
use futures_util::{pin_mut, StreamExt}; use futures_util::StreamExt;
use log::info; use log::{debug, error};
use tokio::{ use tokio::time::sleep;
io::{stdout, AsyncWriteExt}, use tokio_tungstenite::{
time::sleep, connect_async,
tungstenite::{protocol::{frame::coding::CloseCode, CloseFrame}, Message},
}; };
use tokio_tungstenite::connect_async;
mod clash_conn_msg; mod clash_conn_msg;
mod statistics; mod statistics;
mod udp_server; mod udp_server;
mod wan;
#[derive(Parser)] use wan::poll_wan_traffic;
#[derive(Parser, Debug)]
#[clap( #[clap(
version = "0.1.0", version = "0.1.0",
author = "Ivan Li", author = "Ivan Li",
@ -22,51 +26,85 @@ mod udp_server;
struct Args { struct Args {
#[clap( #[clap(
short, short,
env,
long, long,
default_value = "ws://192.168.31.1:9090/connections?token=123456" default_value = "ws://192.168.1.1:9090/connections?token=123456"
)] )]
clash_url: String, clash_url: String,
#[clap(short, long, default_value = "17890")] #[clap(short = 'p', long, env, default_value = "17890")]
listen_port: u16, listen_port: u16,
#[clap(short, long, env, default_value = "http://192.168.1.1/cgi-bin/luci")]
luci_url: String,
#[clap(short = 'u', long, env, default_value = "root")]
luci_username: String,
#[clap(short = 'P', long, env, default_value = "123456")]
luci_password: String,
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
dotenvy::dotenv().ok();
env_logger::init(); env_logger::init();
println!("Hello, world!"); println!("Hello, world!");
let args = Args::parse(); let args = Args::parse();
tokio::spawn(async move {
loop {
if let Err(err) = poll_wan_traffic(
args.luci_url.as_str(),
args.luci_username.as_str(),
args.luci_password.as_str(),
)
.await
{
error!("error: {}", err);
}
sleep(Duration::from_secs(1)).await;
error!("restart poll_wan_traffic!");
}
});
let connect_addr = args.clash_url; let connect_addr = args.clash_url;
loop { loop {
pipe(connect_addr.clone()).await; if let Err(err) = pipe(connect_addr.clone()).await {
error!("{}", err);
};
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
info!("restart!"); error!("restart clash!");
} }
} }
async fn pipe(connect_addr: String) { async fn pipe(connect_addr: String) -> anyhow::Result<()> {
let url = url::Url::parse(&connect_addr).unwrap(); let url = url::Url::parse(&connect_addr).map_err(|err| anyhow::anyhow!(err))?;
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); let (mut ws_stream, _) = connect_async(url)
.await
.map_err(|err| anyhow::anyhow!(err))?;
println!("WebSocket handshake has been successfully completed"); println!("WebSocket handshake has been successfully completed");
let (_, read) = ws_stream.split(); while let Some(message) = ws_stream.next().await {
let message = message?;
let ws_to_stdout = { match message {
read.for_each(|message| async { Message::Text(text) => {
let data = message.unwrap().into_data(); let data = text.into_bytes();
let wrapper = serde_json::from_slice::<clash_conn_msg::ClashConnectionsWrapper>(&data); let wrapper =
serde_json::from_slice::<clash_conn_msg::ClashConnectionsWrapper>(&data);
if let Err(err) = wrapper { if let Err(err) = wrapper {
stdout() error!("parse message failed. {}", err);
.write_all(format!("Error: {}\n", err).as_bytes()) ws_stream
.await .close(Some(CloseFrame {
.unwrap(); code: CloseCode::Unsupported,
return; reason: "parse message failed".into(),
})).await
.map_err(|err| anyhow!(err))?;
bail!(err);
} }
let wrapper = wrapper.unwrap(); let wrapper = wrapper.unwrap();
@ -76,22 +114,15 @@ async fn pipe(connect_addr: String) {
let state = statistics_manager.get_state().await; let state = statistics_manager.get_state().await;
// stdout() debug!("len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}",
// .write_all( state.connections,
// format!( state.speed_upload,
// "len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}\n", state.speed_download,
// state.connections, state.direct_upload_speed,
// state.speed_upload, state.direct_download_speed,
// state.speed_download, state.proxy_upload_speed,
// state.direct_upload_speed, state.proxy_download_speed,
// state.direct_download_speed, );
// state.proxy_upload_speed,
// state.proxy_download_speed,
// )
// .as_bytes(),
// )
// .await
// .unwrap();
let udp_server = udp_server::UdpServer::global().await; let udp_server = udp_server::UdpServer::global().await;
let mut buf = [0; 32]; let mut buf = [0; 32];
@ -99,10 +130,15 @@ async fn pipe(connect_addr: String) {
buf[8..16].copy_from_slice(&state.direct_download_speed.to_le_bytes()); buf[8..16].copy_from_slice(&state.direct_download_speed.to_le_bytes());
buf[16..24].copy_from_slice(&state.proxy_upload_speed.to_le_bytes()); buf[16..24].copy_from_slice(&state.proxy_upload_speed.to_le_bytes());
buf[24..32].copy_from_slice(&state.proxy_download_speed.to_le_bytes()); buf[24..32].copy_from_slice(&state.proxy_download_speed.to_le_bytes());
udp_server.publish(&buf).await; udp_server.publish_clash(&buf).await;
}) }
}; Message::Close(_) => {
println!("Server requested close");
break;
}
_ => {}
}
}
pin_mut!(ws_to_stdout); Ok(())
ws_to_stdout.await;
} }

View File

@ -4,13 +4,13 @@ use std::{
sync::Arc, sync::Arc,
}; };
use clap::Parser;
use log::{error, info}; use log::{error, info};
use tokio::{ use tokio::{
net::UdpSocket, net::UdpSocket,
sync::{Mutex, OnceCell}, sync::{Mutex, OnceCell},
time::Instant, time::Instant,
}; };
use clap::Parser;
use crate::Args; use crate::Args;
@ -24,6 +24,7 @@ pub struct UdpServer {
listen_addr: Ipv4Addr, listen_addr: Ipv4Addr,
listen_port: u16, listen_port: u16,
clients: Arc<Mutex<HashMap<SocketAddr, Client>>>, clients: Arc<Mutex<HashMap<SocketAddr, Client>>>,
tx_buffer: Arc<Mutex<[u8; 48]>>,
} }
impl UdpServer { impl UdpServer {
@ -54,6 +55,7 @@ impl UdpServer {
listen_addr, listen_addr,
listen_port, listen_port,
clients: Arc::new(Mutex::new(HashMap::new())), clients: Arc::new(Mutex::new(HashMap::new())),
tx_buffer: Arc::new(Mutex::new([0u8; 48])),
} }
} }
@ -95,7 +97,25 @@ impl UdpServer {
} }
} }
pub async fn publish(&self, buf: &[u8]) { pub async fn publish_clash(&self, buf: &[u8]) {
let mut tx_buffer = self.tx_buffer.lock().await;
tx_buffer[..32].copy_from_slice(buf);
let buf = tx_buffer.clone();
drop(tx_buffer);
self.publish(&buf).await;
}
pub async fn publish_wan(&self, buf: &[u8]) {
let mut tx_buffer = self.tx_buffer.lock().await;
tx_buffer[32..].copy_from_slice(buf);
let buf = tx_buffer.clone();
drop(tx_buffer);
self.publish(&buf).await;
}
async fn publish(&self, buf: &[u8]) {
let mut to_remove = Vec::new(); let mut to_remove = Vec::new();
let mut clients = self.clients.lock().await; let mut clients = self.clients.lock().await;
@ -103,7 +123,7 @@ impl UdpServer {
if client.last_seen.elapsed().as_secs() > 10 { if client.last_seen.elapsed().as_secs() > 10 {
to_remove.push(addr.clone()); to_remove.push(addr.clone());
} else { } else {
if let Err(err) = client.socket.send(buf).await { if let Err(err) = client.socket.send(&buf).await {
error!("Failed to send data to {}: {}", addr, err); error!("Failed to send data to {}: {}", addr, err);
} else { } else {
info!("Sent data to {}", addr); info!("Sent data to {}", addr);

194
src/wan.rs Normal file
View File

@ -0,0 +1,194 @@
use std::time::Duration;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Result;
use http_body_util::BodyExt;
use http_body_util::Empty;
use hyper::StatusCode;
use hyper::{
body::{Buf, Bytes},
Request,
};
use hyper_util::rt::TokioIo;
use log::debug;
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio::time::sleep;
use crate::udp_server;
pub type OpenWRTIFaces = Vec<OpenWRTIFace>;
pub async fn poll_wan_traffic(luci_url: &str, username: &str, password: &str) -> Result<()> {
// Parse our URL...
let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::<hyper::Uri>()?;
// Get the host and the port
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let address = format!("{}:{}", host, port);
let mut cookies_str = String::new();
// Open a TCP connection to the remote host
loop {
let stream = TcpStream::connect(address.clone()).await?;
// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);
// Create the Hyper client
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
// Spawn a task to poll the connection, driving the HTTP state
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});
let mut prev_up = 0;
let mut prev_down = 0;
loop {
if sender.is_closed() {
break;
}
if !sender.is_ready() {
sleep(Duration::from_millis(100)).await;
continue;
}
let poll_req = generate_poll_wan_req(luci_url, &cookies_str)?;
// Await the response...
let res = sender.send_request(poll_req.clone()).await?;
debug!("Response status: {}", res.status());
match res.status() {
StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
cookies_str = login(luci_url, username, password).await?;
continue;
}
_ => {}
}
// asynchronously aggregate the chunks of the body
let body = res.collect().await?.aggregate();
// try to parse as json with serde_json
let interfaces: OpenWRTIFaces = serde_json::from_reader(body.reader())?;
let wan = interfaces.first().unwrap();
if prev_up > wan.tx_bytes || prev_down > wan.rx_bytes {
prev_up = wan.tx_bytes;
prev_down = wan.rx_bytes;
}
let up_speed = (wan.tx_bytes - prev_up) as u64;
let down_speed = (wan.rx_bytes - prev_down) as u64;
prev_up = wan.tx_bytes;
prev_down = wan.rx_bytes;
let udp_server = udp_server::UdpServer::global().await;
let mut buf = [0; 16];
buf[0..8].copy_from_slice(&up_speed.to_le_bytes());
buf[8..16].copy_from_slice(&down_speed.to_le_bytes());
udp_server.publish_wan(&buf).await;
// println!("speed: ↑ {}, ↓ {}", up_speed, down_speed);
sleep(Duration::from_secs(1)).await;
}
}
}
async fn login(luci_url: &str, username: &str, password: &str) -> Result<String> {
let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::<hyper::Uri>()?;
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let address = format!("{}:{}", host, port);
let stream = TcpStream::connect(address).await?;
let io = TokioIo::new(stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});
let login_req = generate_login_req(luci_url, username, password)?;
let res = sender.send_request(login_req.clone()).await?;
if res.status() == StatusCode::FORBIDDEN {
bail!("Login failed, got status: {}", res.status());
}
let cookies = res.headers().get_all(hyper::header::SET_COOKIE);
let cookies = cookies
.iter()
.map(|cookie| cookie.to_str().unwrap().split(';').next());
let cookies = cookies.filter_map(|cookie| cookie);
let cookies_str = cookies.collect::<Vec<_>>().join("; ");
Ok(cookies_str)
}
fn generate_poll_wan_req(luci_url: &str, cookie: &str) -> Result<Request<Empty<Bytes>>> {
let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::<hyper::Uri>()?;
let authority = url.authority().unwrap().clone();
let target = url.path_and_query().unwrap().clone();
debug!("Polling WAN traffic. Cookie: {:?}", cookie);
return Request::builder()
.uri(target)
.header(hyper::header::HOST, authority.as_str())
.header(hyper::header::CONNECTION, "Keep-Alive")
.header(hyper::header::COOKIE, cookie)
.body(Empty::<Bytes>::new())
.map_err(|e| anyhow!(e));
}
fn generate_login_req(luci_url: &str, username: &str, password: &str) -> Result<Request<String>> {
let url = format!("{}/admin/network/iface_status/wan", luci_url).parse::<hyper::Uri>()?;
let authority = url.authority().unwrap().clone();
let target = url.path_and_query().unwrap().clone();
let login_params = format!("luci_username={}&luci_password={}", username, password);
let body = login_params;
return Request::builder()
.uri(target)
.method("POST")
.header(hyper::header::HOST, authority.as_str())
.header(
hyper::header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.header(hyper::header::CONNECTION, "Keep-Alive")
.body(body)
.map_err(|e| anyhow!(e));
}
#[derive(Serialize, Deserialize)]
pub struct OpenWRTIFace {
rx_bytes: u64,
tx_bytes: u64,
}
impl Default for OpenWRTIFace {
fn default() -> Self {
Self {
rx_bytes: 0,
tx_bytes: 0,
}
}
}