fix: auto reconnection.
All checks were successful
Gitea Actions Demo / build (push) Successful in 3m55s

This commit is contained in:
Ivan Li 2024-05-02 05:06:47 +00:00
parent 34b301cf5a
commit 0e77ffe9ba

View File

@ -2,7 +2,7 @@ use std::{format, time::Duration};
use clap::Parser; use clap::Parser;
use futures_util::{pin_mut, StreamExt}; use futures_util::{pin_mut, StreamExt};
use log::info; use log::{error, info};
use tokio::{ use tokio::{
io::{stdout, AsyncWriteExt}, io::{stdout, AsyncWriteExt},
time::sleep, time::sleep,
@ -50,43 +50,57 @@ async fn main() {
let args = Args::parse(); let args = Args::parse();
tokio::spawn(async move { tokio::spawn(async move {
poll_wan_traffic( loop {
args.luci_url.as_str(), if let Err(err) = poll_wan_traffic(
args.luci_username.as_str(), args.luci_url.as_str(),
args.luci_password.as_str(), args.luci_username.as_str(),
) args.luci_password.as_str(),
.await )
.unwrap(); .await
{
error!("error: {}", err);
}
sleep(Duration::from_secs(1)).await;
error!("restart poll_wan_traffic!");
}
}); });
let connect_addr = args.clash_url; let connect_addr = args.clash_url;
loop { loop {
pipe(connect_addr.clone()).await; if let Err(err) = pipe(connect_addr.clone()).await {
error!("{}", err);
};
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
info!("restart!"); error!("restart clash!");
} }
} }
async fn pipe(connect_addr: String) { async fn pipe(connect_addr: String) -> anyhow::Result<()> {
let url = url::Url::parse(&connect_addr).unwrap(); let url = url::Url::parse(&connect_addr).map_err(|err| anyhow::anyhow!(err))?;
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); let (ws_stream, _) = connect_async(url)
.await
.map_err(|err| anyhow::anyhow!(err))?;
println!("WebSocket handshake has been successfully completed"); println!("WebSocket handshake has been successfully completed");
let (_, read) = ws_stream.split(); let (_, read) = ws_stream.split();
let ws_to_stdout = { let ws_to_stdout = {
read.for_each(|message| async { read.for_each(|message| async {
let data = message.unwrap().into_data(); if let Err(err) = message {
error!("bad message. {}", err);
return;
}
let message = message.unwrap();
let data = message.into_data();
let wrapper = serde_json::from_slice::<clash_conn_msg::ClashConnectionsWrapper>(&data); let wrapper = serde_json::from_slice::<clash_conn_msg::ClashConnectionsWrapper>(&data);
if let Err(err) = wrapper { if let Err(err) = wrapper {
stdout() error!("parse message failed. {}", err);
.write_all(format!("Error: {}\n", err).as_bytes())
.await
.unwrap();
return; return;
} }
let wrapper = wrapper.unwrap(); let wrapper = wrapper.unwrap();
@ -126,4 +140,6 @@ async fn pipe(connect_addr: String) {
pin_mut!(ws_to_stdout); pin_mut!(ws_to_stdout);
ws_to_stdout.await; ws_to_stdout.await;
Ok(())
} }