fix: 更新配置时无法应用配置到灯带颜色获取逻辑。

This commit is contained in:
2023-04-14 22:18:59 +08:00
parent 9cbccedc72
commit a905c98823
4 changed files with 53 additions and 146 deletions

View File

@ -23,8 +23,6 @@ pub struct LedColorsPublisher {
sorted_colors_tx: Arc<RwLock<watch::Sender<Vec<u8>>>>,
colors_rx: Arc<RwLock<watch::Receiver<Vec<u8>>>>,
colors_tx: Arc<RwLock<watch::Sender<Vec<u8>>>>,
display_colors_rx: Arc<RwLock<broadcast::Receiver<(u32, Vec<u8>)>>>,
display_colors_tx: Arc<RwLock<broadcast::Sender<(u32, Vec<u8>)>>>,
inner_tasks_version: Arc<RwLock<usize>>,
}
@ -35,7 +33,6 @@ impl LedColorsPublisher {
let (sorted_tx, sorted_rx) = watch::channel(Vec::new());
let (tx, rx) = watch::channel(Vec::new());
let (display_colors_tx, display_colors_rx) = broadcast::channel(8);
LED_COLORS_PUBLISHER_GLOBAL
.get_or_init(|| async {
@ -44,8 +41,6 @@ impl LedColorsPublisher {
sorted_colors_tx: Arc::new(RwLock::new(sorted_tx)),
colors_rx: Arc::new(RwLock::new(rx)),
colors_tx: Arc::new(RwLock::new(tx)),
display_colors_rx: Arc::new(RwLock::new(display_colors_rx)),
display_colors_tx: Arc::new(RwLock::new(display_colors_tx)),
inner_tasks_version: Arc::new(RwLock::new(0)),
}
})
@ -56,13 +51,11 @@ impl LedColorsPublisher {
&self,
display_id: u32,
sample_points: Vec<Vec<LedSamplePoints>>,
display_colors_tx: broadcast::Sender<(u32, Vec<u8>)>,
) {
let display_colors_tx = self.display_colors_tx.clone();
let internal_tasks_version = self.inner_tasks_version.clone();
tokio::spawn(async move {
let display_colors_tx = display_colors_tx.read().await.clone();
let colors = screenshot_manager::get_display_colors(display_id, &sample_points);
if let Err(err) = colors {
@ -78,7 +71,9 @@ impl LedColorsPublisher {
interval.tick().await;
tokio::time::sleep(Duration::from_millis(1)).await;
if internal_tasks_version.read().await.clone() != init_version {
let version = internal_tasks_version.read().await.clone();
if version != init_version {
log::info!(
"inner task version changed, stop. {} != {}",
internal_tasks_version.read().await.clone(),
@ -121,14 +116,13 @@ impl LedColorsPublisher {
});
}
fn start_all_colors_worker(&self, display_ids: Vec<u32>, mappers: Vec<SamplePointMapper>) {
fn start_all_colors_worker(&self, display_ids: Vec<u32>, mappers: Vec<SamplePointMapper>, mut display_colors_rx: broadcast::Receiver<(u32, Vec<u8>)>) {
let sorted_colors_tx = self.sorted_colors_tx.clone();
let colors_tx = self.colors_tx.clone();
let display_colors_rx = self.display_colors_rx.clone();
log::debug!("start all_colors_worker");
tokio::spawn(async move {
for _ in 0..10 {
let mut rx = display_colors_rx.read().await.resubscribe();
let sorted_colors_tx = sorted_colors_tx.write().await;
let colors_tx = colors_tx.write().await;
@ -136,15 +130,14 @@ impl LedColorsPublisher {
let mut all_colors: Vec<Option<Vec<u8>>> = vec![None; display_ids.len()];
let mut start: tokio::time::Instant = tokio::time::Instant::now();
log::info!("start all_colors_worker");
log::debug!("start all_colors_worker task");
loop {
// log::info!("display_colors_rx changed");
let color_info = rx.recv().await;
let color_info = display_colors_rx.recv().await;
if let Err(err) = color_info {
match err {
broadcast::error::RecvError::Closed => {
break;
return;
}
broadcast::error::RecvError::Lagged(_) => {
warn!("display_colors_rx lagged");
@ -200,9 +193,6 @@ impl LedColorsPublisher {
tokio::spawn(async move {
let publisher = Self::global().await;
let mut inner_tasks_version = inner_tasks_version.write().await;
*inner_tasks_version = inner_tasks_version.overflowing_add(1).0;
let config_manager = ConfigManager::global().await;
let mut config_receiver = config_manager.clone_config_update_receiver();
@ -221,113 +211,34 @@ impl LedColorsPublisher {
let configs = configs.unwrap();
let mut inner_tasks_version = inner_tasks_version.write().await;
*inner_tasks_version = inner_tasks_version.overflowing_add(1).0;
drop(inner_tasks_version);
let (display_colors_tx, display_colors_rx) = broadcast::channel::<(u32, Vec<u8>)>(8);
for sample_point_group in configs.sample_point_groups.clone() {
let display_id = sample_point_group.display_id;
let sample_points = sample_point_group.points;
publisher.start_one_display_colors_fetcher(display_id, sample_points);
publisher.start_one_display_colors_fetcher(
display_id,
sample_points,
display_colors_tx.clone(),
);
}
let display_ids = configs.sample_point_groups;
publisher.start_all_colors_worker(
display_ids.iter().map(|c| c.display_id).collect(),
configs.mappers,
display_colors_rx,
);
break;
}
});
// tokio::spawn(async move {
// loop {
// let sorted_colors_tx = sorted_colors_tx.write().await;
// let colors_tx = colors_tx.write().await;
// let screenshot_manager = ScreenshotManager::global().await;
// let config_manager = ConfigManager::global().await;
// let config_receiver = config_manager.clone_config_update_receiver();
// let configs = config_receiver.borrow().clone();
// let configs = Self::get_colors_configs(&configs).await;
// if let Err(err) = configs {
// warn!("Failed to get configs: {}", err);
// sleep(Duration::from_millis(100)).await;
// continue;
// }
// let configs = configs.unwrap();
// let mut merged_screenshot_receiver =
// screenshot_manager.clone_merged_screenshot_rx().await;
// let mut screenshots = HashMap::new();
// // let mut start = tokio::time::Instant::now();
// loop {
// let screenshot = merged_screenshot_receiver.recv().await;
// if let Err(err) = screenshot {
// match err {
// tokio::sync::broadcast::error::RecvError::Closed => {
// warn!("closed");
// continue;
// }
// tokio::sync::broadcast::error::RecvError::Lagged(_) => {
// warn!("lagged");
// continue;
// }
// }
// }
// let screenshot = screenshot.unwrap();
// // log::info!("got screenshot: {:?}", screenshot.display_id);
// screenshots.insert(screenshot.display_id, screenshot);
// if screenshots.len() == configs.sample_point_groups.len() {
// // log::info!("{}", start.elapsed().as_millis().to_string());
// {
// let screenshots = configs
// .sample_point_groups
// .iter()
// .map(|strip| screenshots.get(&strip.display_id).unwrap())
// .collect::<Vec<_>>();
// let colors = screenshot_manager
// .get_all_colors(&configs.sample_point_groups, &screenshots)
// .await;
// let sorted_colors =
// ScreenshotManager::get_sorted_colors(&colors, &configs.mappers)
// .await;
// match colors_tx.send(colors) {
// Ok(_) => {
// // log::info!("colors updated");
// }
// Err(_) => {
// warn!("colors update failed");
// }
// }
// match sorted_colors_tx.send(sorted_colors) {
// Ok(_) => {
// // log::info!("colors updated");
// }
// Err(_) => {
// warn!("colors update failed");
// }
// }
// }
// // screenshots.clear();
// // start = tokio::time::Instant::now();
// }
// }
// }
// });
let rx = self.sorted_colors_rx.clone();
tokio::spawn(async move {
let mut rx = rx.read().await.clone();
@ -380,6 +291,7 @@ impl LedColorsPublisher {
let mut colors_configs = Vec::new();
let mut merged_screenshot_receiver = screenshot_manager.clone_merged_screenshot_rx().await;
merged_screenshot_receiver.resubscribe();
let mut screenshots = HashMap::new();
@ -431,10 +343,11 @@ impl LedColorsPublisher {
colors_configs.push(colors_config);
}
log::debug!("got all colors configs: {:?}", colors_configs.len());
return Ok(AllColorConfig {
sample_point_groups: colors_configs,
mappers,
// screenshot_receivers: local_rx_list,
});
}
}