From 0e77ffe9ba7d6b3fc35910171e138a4090d07c65 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Thu, 2 May 2024 05:06:47 +0000 Subject: [PATCH] fix: auto reconnection. --- src/main.rs | 52 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7457c3a..92e493d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use std::{format, time::Duration}; use clap::Parser; use futures_util::{pin_mut, StreamExt}; -use log::info; +use log::{error, info}; use tokio::{ io::{stdout, AsyncWriteExt}, time::sleep, @@ -50,43 +50,57 @@ async fn main() { let args = Args::parse(); tokio::spawn(async move { - poll_wan_traffic( - args.luci_url.as_str(), - args.luci_username.as_str(), - args.luci_password.as_str(), - ) - .await - .unwrap(); + loop { + if let Err(err) = poll_wan_traffic( + args.luci_url.as_str(), + args.luci_username.as_str(), + args.luci_password.as_str(), + ) + .await + { + error!("error: {}", err); + } + + sleep(Duration::from_secs(1)).await; + error!("restart poll_wan_traffic!"); + } }); let connect_addr = args.clash_url; loop { - pipe(connect_addr.clone()).await; + if let Err(err) = pipe(connect_addr.clone()).await { + error!("{}", err); + }; sleep(Duration::from_secs(1)).await; - info!("restart!"); + error!("restart clash!"); } } -async fn pipe(connect_addr: String) { - let url = url::Url::parse(&connect_addr).unwrap(); +async fn pipe(connect_addr: String) -> anyhow::Result<()> { + let url = url::Url::parse(&connect_addr).map_err(|err| anyhow::anyhow!(err))?; - let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); + let (ws_stream, _) = connect_async(url) + .await + .map_err(|err| anyhow::anyhow!(err))?; println!("WebSocket handshake has been successfully completed"); let (_, read) = ws_stream.split(); let ws_to_stdout = { read.for_each(|message| async { - let data = message.unwrap().into_data(); + if let Err(err) = message { + error!("bad message. {}", err); + return; + } + + let message = message.unwrap(); + let data = message.into_data(); let wrapper = serde_json::from_slice::(&data); if let Err(err) = wrapper { - stdout() - .write_all(format!("Error: {}\n", err).as_bytes()) - .await - .unwrap(); + error!("parse message failed. {}", err); return; } let wrapper = wrapper.unwrap(); @@ -126,4 +140,6 @@ async fn pipe(connect_addr: String) { pin_mut!(ws_to_stdout); ws_to_stdout.await; + + Ok(()) }