fix: auto reconnection x2.

This commit is contained in:
Ivan Li 2024-05-03 04:00:27 +00:00
parent 0bedfb261c
commit bb015bf662

View File

@ -1,13 +1,14 @@
use std::{format, time::Duration}; use std::time::Duration;
use anyhow::{anyhow, bail};
use clap::Parser; use clap::Parser;
use futures_util::{pin_mut, StreamExt}; use futures_util::StreamExt;
use log::{error, info}; use log::{debug, error};
use tokio::{ use tokio::time::sleep;
io::{stdout, AsyncWriteExt}, use tokio_tungstenite::{
time::sleep, connect_async,
tungstenite::{protocol::{frame::coding::CloseCode, CloseFrame}, Message},
}; };
use tokio_tungstenite::connect_async;
mod clash_conn_msg; mod clash_conn_msg;
mod statistics; mod statistics;
@ -80,28 +81,30 @@ async fn main() {
async fn pipe(connect_addr: String) -> anyhow::Result<()> { async fn pipe(connect_addr: String) -> anyhow::Result<()> {
let url = url::Url::parse(&connect_addr).map_err(|err| anyhow::anyhow!(err))?; let url = url::Url::parse(&connect_addr).map_err(|err| anyhow::anyhow!(err))?;
let (ws_stream, _) = connect_async(url) let (mut ws_stream, _) = connect_async(url)
.await .await
.map_err(|err| anyhow::anyhow!(err))?; .map_err(|err| anyhow::anyhow!(err))?;
println!("WebSocket handshake has been successfully completed"); println!("WebSocket handshake has been successfully completed");
let (_, read) = ws_stream.split(); while let Some(message) = ws_stream.next().await {
let message = message?;
let ws_to_stdout = { match message {
read.for_each(|message| async { Message::Text(text) => {
if let Err(err) = message { let data = text.into_bytes();
error!("bad message. {}", err);
return;
}
let message = message.unwrap(); let wrapper =
let data = message.into_data(); serde_json::from_slice::<clash_conn_msg::ClashConnectionsWrapper>(&data);
let wrapper = serde_json::from_slice::<clash_conn_msg::ClashConnectionsWrapper>(&data);
if let Err(err) = wrapper { if let Err(err) = wrapper {
error!("parse message failed. {}", err); error!("parse message failed. {}", err);
return; ws_stream
.close(Some(CloseFrame {
code: CloseCode::Unsupported,
reason: "parse message failed".into(),
})).await
.map_err(|err| anyhow!(err))?;
bail!(err);
} }
let wrapper = wrapper.unwrap(); let wrapper = wrapper.unwrap();
@ -111,22 +114,15 @@ async fn pipe(connect_addr: String) -> anyhow::Result<()> {
let state = statistics_manager.get_state().await; let state = statistics_manager.get_state().await;
// stdout() debug!("len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}",
// .write_all( state.connections,
// format!( state.speed_upload,
// "len: {},speed_upload: {}, speed_download: {}, update_direct_speed: {}, download_direct_speed: {}, update_proxy_speed: {}, download_proxy_speed: {}\n", state.speed_download,
// state.connections, state.direct_upload_speed,
// state.speed_upload, state.direct_download_speed,
// state.speed_download, state.proxy_upload_speed,
// state.direct_upload_speed, state.proxy_download_speed,
// state.direct_download_speed, );
// state.proxy_upload_speed,
// state.proxy_download_speed,
// )
// .as_bytes(),
// )
// .await
// .unwrap();
let udp_server = udp_server::UdpServer::global().await; let udp_server = udp_server::UdpServer::global().await;
let mut buf = [0; 32]; let mut buf = [0; 32];
@ -135,11 +131,14 @@ async fn pipe(connect_addr: String) -> anyhow::Result<()> {
buf[16..24].copy_from_slice(&state.proxy_upload_speed.to_le_bytes()); buf[16..24].copy_from_slice(&state.proxy_upload_speed.to_le_bytes());
buf[24..32].copy_from_slice(&state.proxy_download_speed.to_le_bytes()); buf[24..32].copy_from_slice(&state.proxy_download_speed.to_le_bytes());
udp_server.publish_clash(&buf).await; udp_server.publish_clash(&buf).await;
}) }
}; Message::Close(_) => {
println!("Server requested close");
pin_mut!(ws_to_stdout); break;
ws_to_stdout.await; }
_ => {}
}
}
Ok(()) Ok(())
} }