diff --git a/src/input/composite_device/client.rs b/src/input/composite_device/client.rs index 16c655fb..ee8e4fe2 100644 --- a/src/input/composite_device/client.rs +++ b/src/input/composite_device/client.rs @@ -418,6 +418,7 @@ impl CompositeDeviceClient { } /// Update the input capabilities for the given target device (blocking) + #[allow(dead_code)] pub fn blocking_update_target_capabilities( &self, dbus_path: String, diff --git a/src/input/composite_device/mod.rs b/src/input/composite_device/mod.rs index edec8068..c8316c4a 100644 --- a/src/input/composite_device/mod.rs +++ b/src/input/composite_device/mod.rs @@ -813,7 +813,7 @@ impl CompositeDevice { let Some(source) = self.source_devices.get(source_id) else { continue; }; - log::debug!("Erasing effect from {source_id}"); + log::debug!("Erasing effect {effect_id} from {source_id}"); if let Err(e) = source.erase_effect(*source_effect_id).await { log::warn!("Failed to erase FF effect from {source_id}: {:?}", e); } diff --git a/src/input/source/mod.rs b/src/input/source/mod.rs index 2a4a386c..9f300476 100644 --- a/src/input/source/mod.rs +++ b/src/input/source/mod.rs @@ -494,7 +494,7 @@ impl SourceDriver } SourceCommand::EraseEffect(id, composite_dev) => { let res = match implementation.erase_effect(id) { - Ok(_) => Ok(()), + Ok(_) => composite_dev.send(Ok(())), Err(e) => { let err = format!("Failed to erase effect: {e:?}"); composite_dev.send(Err(err.into())) diff --git a/src/input/target/debug.rs b/src/input/target/debug.rs index 1300b8de..59e85b25 100644 --- a/src/input/target/debug.rs +++ b/src/input/target/debug.rs @@ -96,13 +96,17 @@ impl DebugDevice { log::debug!("Using capability report: {}", self.capability_report); // Inform the composite device that the capabilities have changed - if let Some(dbus_path) = self.dbus_path.as_ref() { + if let Some(dbus_path) = self.dbus_path.clone() { log::debug!("Updating composite device with new capabilities"); - if let Err(e) = composite_device - .blocking_update_target_capabilities(dbus_path.clone(), capabilities) - { - log::warn!("Failed to update target capabilities: {e:?}"); - } + let composite_device = composite_device.clone(); + tokio::spawn(async move { + if let Err(e) = composite_device + .update_target_capabilities(dbus_path.clone(), capabilities) + .await + { + log::warn!("Failed to update target capabilities: {e:?}"); + } + }); } // Signal that capabilities have changed diff --git a/src/input/target/steam_deck.rs b/src/input/target/steam_deck.rs index 1035fd93..4c5925b9 100644 --- a/src/input/target/steam_deck.rs +++ b/src/input/target/steam_deck.rs @@ -7,9 +7,10 @@ use std::{ collections::HashMap, error::Error, fmt::Debug, + sync::mpsc::{channel, Receiver, TryRecvError}, time::{Duration, Instant}, }; -use tokio::sync::mpsc::{channel, Receiver}; + use virtual_usb::{ usb::{ hid::{HidInterfaceBuilder, HidReportType, HidRequest, HidSubclass, InterfaceProtocol}, @@ -812,7 +813,7 @@ impl TargetInputDevice for SteamDeckDevice { &mut self, composite_device: CompositeDeviceClient, ) -> Result<(), InputError> { - let (tx, rx) = channel(1); + let (tx, rx) = channel(); let mut device_config = self.config.clone(); // Spawn a task to wait for the composite device config. This is done @@ -904,7 +905,7 @@ impl TargetInputDevice for SteamDeckDevice { device_config.product_id.to_u16(), ); - if let Err(e) = tx.send(device_config).await { + if let Err(e) = tx.send(device_config) { log::error!("Failed to send device config to target device. Got error: {e:?}"); }; }); @@ -1049,31 +1050,18 @@ impl TargetOutputDevice for SteamDeckDevice { /// USB transfers. fn poll(&mut self, _: &Option) -> Result, OutputError> { // Create and start the device if needed - if self.config_rx.is_some() { - if let Some(rx) = self.config_rx.as_ref() { - if rx.is_empty() { - // If the queue is empty, we're still waiting for a response from - // the composite device. - return Ok(vec![]); - } - } - let Some(mut rx) = self.config_rx.take() else { - return Ok(vec![]); + if let Some(rx) = self.config_rx.as_ref() { + let config = match rx.try_recv() { + Ok(config) => config, + Err(e) => match e { + TryRecvError::Empty => { + // If the queue is empty, we're still waiting for a response from + // the composite device. + return Ok(vec![]); + } + TryRecvError::Disconnected => self.config.clone(), + }, }; - - let (sync_tx, sync_rx) = std::sync::mpsc::channel(); - let default_config = self.config.clone(); - tokio::spawn(async move { - let config = match rx.recv().await { - Some(config) => config, - None => default_config, - }; - if let Err(e) = sync_tx.send(config) { - log::error!("Failed to send config to device thread: {e}"); - } - }); - - let config = sync_rx.recv().unwrap_or(self.config.clone()); let device = SteamDeckDevice::create_virtual_device(&config)?; self.device = Some(device); self.config = config; diff --git a/src/input/target/steam_deck_uhid.rs b/src/input/target/steam_deck_uhid.rs index 4ec06959..3bf3f2a6 100644 --- a/src/input/target/steam_deck_uhid.rs +++ b/src/input/target/steam_deck_uhid.rs @@ -4,6 +4,7 @@ use std::{ error::Error, fmt::Debug, fs::File, + sync::mpsc::{channel, Receiver, TryRecvError}, time::{Duration, Instant}, }; @@ -11,7 +12,6 @@ use packed_struct::{ types::{Integer, SizedInteger}, PackedStruct, }; -use tokio::sync::mpsc::{channel, Receiver}; use uhid_virt::{Bus, CreateParams, StreamError, UHIDDevice}; use crate::{ @@ -619,7 +619,7 @@ impl TargetInputDevice for SteamDeckUhidDevice { &mut self, composite_device: CompositeDeviceClient, ) -> Result<(), InputError> { - let (tx, rx) = channel(1); + let (tx, rx) = channel(); let mut device_config = self.config.clone(); // Spawn a task to wait for the composite device config. This is done @@ -713,7 +713,7 @@ impl TargetInputDevice for SteamDeckUhidDevice { device_config.product_id.to_u16(), ); - if let Err(e) = tx.send(device_config).await { + if let Err(e) = tx.send(device_config) { log::error!("Failed to send device config to target device. Got error: {e:?}"); }; }); @@ -839,31 +839,18 @@ impl TargetOutputDevice for SteamDeckUhidDevice { /// USB transfers. fn poll(&mut self, _: &Option) -> Result, OutputError> { // Create and start the device if needed - if self.config_rx.is_some() { - if let Some(rx) = self.config_rx.as_ref() { - if rx.is_empty() { - // If the queue is empty, we're still waiting for a response from - // the composite device. - return Ok(vec![]); - } - } - let Some(mut rx) = self.config_rx.take() else { - return Ok(vec![]); + if let Some(rx) = self.config_rx.as_ref() { + let config = match rx.try_recv() { + Ok(config) => config, + Err(e) => match e { + TryRecvError::Empty => { + // If the queue is empty, we're still waiting for a response from + // the composite device. + return Ok(vec![]); + } + TryRecvError::Disconnected => self.config.clone(), + }, }; - - let (sync_tx, sync_rx) = std::sync::mpsc::channel(); - let default_config = self.config.clone(); - tokio::spawn(async move { - let config = match rx.recv().await { - Some(config) => config, - None => default_config, - }; - if let Err(e) = sync_tx.send(config) { - log::error!("Failed to send config to device thread: {e}"); - } - }); - - let config = sync_rx.recv().unwrap_or(self.config.clone()); let device = SteamDeckUhidDevice::create_virtual_device(&config)?; self.device = Some(device); self.config = config; diff --git a/src/input/target/unified_gamepad.rs b/src/input/target/unified_gamepad.rs index 1d154cf2..cc7adeb3 100644 --- a/src/input/target/unified_gamepad.rs +++ b/src/input/target/unified_gamepad.rs @@ -129,13 +129,17 @@ impl UnifiedGamepadDevice { log::debug!("Using capability report: {}", self.capability_report); // Inform the composite device that the capabilities have changed - if let Some(dbus_path) = self.dbus_path.as_ref() { + if let Some(dbus_path) = self.dbus_path.clone() { log::debug!("Updating composite device with new capabilities"); - if let Err(e) = composite_device - .blocking_update_target_capabilities(dbus_path.clone(), capabilities) - { - log::warn!("Failed to update target capabilities: {e:?}"); - } + let composite_device = composite_device.clone(); + tokio::spawn(async move { + if let Err(e) = composite_device + .update_target_capabilities(dbus_path.clone(), capabilities) + .await + { + log::warn!("Failed to update target capabilities: {e:?}"); + } + }); } log::debug!("Updated capabilities"); diff --git a/src/input/target/xpad.rs b/src/input/target/xpad.rs index 6b522fce..be45c8f7 100644 --- a/src/input/target/xpad.rs +++ b/src/input/target/xpad.rs @@ -335,7 +335,7 @@ impl TargetOutputDevice for XBoxController { let effect_id = event.effect_id(); log::debug!("Upload effect: {:?} with id {}", event.effect(), effect_id); - let Some(composite_device) = composite_device else { + let Some(composite_device) = composite_device.clone() else { log::debug!("No composite device to upload effect to!"); event.set_retval(-1); continue; @@ -349,10 +349,11 @@ impl TargetOutputDevice for XBoxController { event.effect(), tx, )); - if let Err(e) = composite_device.blocking_process_output_event(upload) { - event.set_retval(-1); - return Err(e.to_string().into()); - } + tokio::task::spawn_blocking(move || { + if let Err(e) = composite_device.blocking_process_output_event(upload) { + log::error!("Failed to send ff upload to composite device: {e}"); + } + }); let effect_id = match rx.recv_timeout(Duration::from_secs(1)) { Ok(id) => id, Err(e) => {