feat: 通过增量、绝对量亮度控制。#5.

This commit is contained in:
2023-01-29 20:53:59 +08:00
parent 6ec61c1bcc
commit 4e3b765059
10 changed files with 539 additions and 65 deletions

View File

@@ -1,23 +1,35 @@
use crate::picker::led_color::LedColor;
use paris::error;
use tokio::sync::{broadcast, OnceCell};
use super::mqtt::MqttConnection;
use once_cell::sync::OnceCell;
use crate::{display, picker::led_color::LedColor};
use super::mqtt::MqttRpc;
pub struct Manager {
mqtt: MqttConnection,
client: MqttRpc,
initialized: bool,
}
impl Manager {
pub fn global() -> &'static Self {
static RPC_MANAGER: OnceCell<Manager> = OnceCell::new();
pub async fn global() -> &'static Self {
static RPC_MANAGER: OnceCell<Manager> = OnceCell::const_new();
RPC_MANAGER.get_or_init(|| Manager::new())
RPC_MANAGER.get_or_init(|| Manager::new()).await
}
pub fn new() -> Self {
let mut mqtt = MqttConnection::new();
mqtt.initialize();
Self { mqtt }
pub async fn new() -> Self {
let mut mqtt = MqttRpc::new();
let initialized = match mqtt.initialize().await {
Ok(_) => true,
Err(err) => {
error!("initialize for mqtt was failed. {:?}", err);
false
}
};
Self {
client: mqtt,
initialized,
}
}
pub async fn publish_led_colors(&self, colors: &Vec<LedColor>) -> anyhow::Result<()> {
@@ -27,29 +39,14 @@ impl Manager {
.flatten()
.collect::<Vec<u8>>();
self.mqtt
.client
.publish(
"display-ambient-light/desktop/colors",
rumqttc::QoS::AtLeastOnce,
false,
payload,
)
.await
.map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
self.publish_led_sub_pixels(payload).await
}
pub async fn publish_led_sub_pixels(&self, payload: Vec<u8>) -> anyhow::Result<()> {
self.client.publish_led_sub_pixels(payload).await
}
self.mqtt
.client
.publish(
"display-ambient-light/desktop/colors",
rumqttc::QoS::AtLeastOnce,
false,
payload,
)
.await
.map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
pub fn client(&self) -> &MqttRpc {
&self.client
}
}

View File

@@ -1,2 +1,4 @@
pub mod manager;
pub mod mqtt;
mod manager;
pub mod mqtt;
pub use manager::*;

View File

@@ -1,41 +1,106 @@
use rumqttc::{AsyncClient, MqttOptions, QoS};
use crate::display;
use image::EncodableLayout;
use paris::warn;
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
use serde::Deserialize;
use std::time::Duration;
use time::{format_description, OffsetDateTime};
use tokio::task;
use tracing::warn;
use tokio::{sync::broadcast, task};
pub struct MqttConnection {
pub client: AsyncClient,
const DISPLAY_TOPIC: &'static str = "display-ambient-light/display";
const DISPLAY_BRIGHTNESS_TOPIC: &'static str = "display-ambient-light/display/brightness";
pub struct MqttRpc {
client: AsyncClient,
change_display_brightness_tx: broadcast::Sender<display::DisplayBrightness>,
}
impl MqttConnection {
impl MqttRpc {
pub fn new() -> Self {
let mut options = MqttOptions::new("rumqtt-async", "192.168.31.11", 1883);
options.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(options, 10);
let (change_display_brightness_tx, _) =
broadcast::channel::<display::DisplayBrightness>(16);
let change_display_brightness_tx2 = change_display_brightness_tx.clone();
task::spawn(async move {
loop {
match eventloop.poll().await {
Ok(_) => {}
Ok(notification) => {
let handled = || -> anyhow::Result<()> {
println!("MQTT notification = {:?}", notification);
if let Event::Incoming(notification) = notification {
if let Incoming::Publish(notification) = notification {
match notification.topic.as_str() {
DISPLAY_BRIGHTNESS_TOPIC => {
let payload_text = String::from_utf8(
notification.payload.as_bytes().to_owned(),
)
.map_err(|err| {
anyhow::anyhow!("can not parse json. {:?}", err)
})?;
let display_brightness: display::DisplayBrightness =
serde_json::from_str(payload_text.as_str())
.map_err(|err| {
anyhow::anyhow!(
"can not deserialize display brightness. {:?}",
err
)
})?;
change_display_brightness_tx2
.send(display_brightness)
.map_err(|err| {
anyhow::anyhow!(
"can not broadcast display brightness. {:?}",
err
)
})?;
}
&_ => {}
};
}
}
Ok(())
};
if let Err(err) = handled() {
warn!("handle notification was failed. Error: {:?}", err);
}
}
Err(err) => {
println!("MQTT Error Event = {:?}", err);
}
}
}
});
Self { client }
Self {
client,
change_display_brightness_tx,
}
}
pub fn initialize(&mut self) {
self.subscribe_board();
pub async fn initialize(&mut self) -> anyhow::Result<()> {
self.subscribe_board().await?;
self.subscribe_display().await?;
self.broadcast_desktop_online();
anyhow::Ok(())
}
async fn subscribe_board(&self) {
async fn subscribe_board(&self) -> anyhow::Result<()> {
self.client
.subscribe("display-ambient-light/board/#", QoS::AtMostOnce)
.await;
.await
.map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err))
}
async fn subscribe_display(&self) -> anyhow::Result<()> {
self.client
.subscribe(format!("{}/#", DISPLAY_TOPIC), QoS::AtMostOnce)
.await
.map_err(|err| anyhow::anyhow!("subscribe board failed. {:?}", err))
}
fn broadcast_desktop_online(&mut self) {
@@ -69,4 +134,20 @@ impl MqttConnection {
}
});
}
pub async fn publish_led_sub_pixels(&self, payload: Vec<u8>) -> anyhow::Result<()> {
self.client
.publish(
"display-ambient-light/desktop/colors",
rumqttc::QoS::AtLeastOnce,
false,
payload,
)
.await
.map_err(|error| anyhow::anyhow!("mqtt publish failed. {}", error))
}
pub fn subscribe_change_display_brightness_rx(&self) -> broadcast::Receiver<display::DisplayBrightness> {
self.change_display_brightness_tx.subscribe()
}
}