Skip to content

Conversation

@strohel
Copy link
Contributor

@strohel strohel commented Nov 11, 2025

Description

Fixes #105, at least for me.

To be reviewed by commits:

  • first commit adds a new example eject_stream_pipewire.rs, which hangs at this point
    • this would be even better as an integration test, but it requires running pipewire, output device, the pw-link command
  • the second commit is a small cleanup/refactor to make the following change easier
  • the third commit is the actual bugfix
    • instead of handling StreamCommands in the process() callback (which runs in the RT thread), we now handle them in the PipeWire main loop. This also prompted a transition from rtrb to pipewire::channel; this makes their handling independent
    • a Mutex is introduced in the audio callback but we never wait on it in the audio process() callback. I tried hard to avoid it, but I believe it is necessary (short of unsafe code), as we need &mut access to the Option<Callback> in two separate threads.

Type of Change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • Code cleanup or refactor

How Has This Been Tested?

Checklist:

  • My code follows the style guidelines of this project
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Wherever possible, I have added tests that prove my fix is effective or that my feature works. For changes that need to be validated manually (i.e. a new audio driver), use examples that can be run to easily validate them.
  • New and existing unit tests pass locally with my changes
  • I have checked my code and corrected any misspellings

CC @mbernat. Supersedes tonarino#5.

..instead of sending it through a command channel.

I haven't found the reason to do this the more complicated way..?
@strohel strohel changed the title PipeWire fix stopped stream ejection PipeWire: fix stopped stream ejection Nov 11, 2025

// If we cannot lock the stream inner data, it means that it is being ejected in
// the PipeWire main loop right now. Note that try_lock() never blocks.
let Ok(mut inner) = inner.try_lock() else {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_block never blocks, but still issues a syscall that can end up with a thread context switch, which is still an unbounded wait that breaks realtime programming rules.

This is why I used the channel method to send the callback back from the audio thread, as that one does not use synchronization and is truly lock-free and wait-free.

I think you can keep the rest of the fix, but revert the Mutex change, that would solve the underlying issue but keep the realtime safety.

Copy link
Contributor Author

@strohel strohel Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @SolarLiner, I understand that the Mutex is problematic.

I don't see an easy way to to remove it though, but please check my logic:

  1. The Option<Callback> with user's callback needs to be shared between 2 threads, the pipewire main loop (where we can receive events even while the stream is paused) and the pipewire rt thread (there process() is called)
  2. We thus need to put it under an Arc to safely share it between the 2 threads
  3. Arc<T> rightfully requires its T to be Send + Sync, and only provides &T (and not &mut T) when there are multiple references.
  4. The user-supplied code expects &mut callback, so we need a wrapper with interior mutability that is also Send + Sync even when the wrapped type isn't. Hence Mutex.

However, this was just trying to stay within std and safe zone, we have more options if we wander to 3rd party crates or unsafe territory.

https://docs.rs/atomic-option/latest/atomic_option/struct.AtomicOption.html has the most suitable API, but the last release is 9 (!) years old, and it has soundness issues and even RUSTSEC advisory filed.

https://docs.rs/crossbeam/latest/crossbeam/atomic/struct.AtomicCell.html seems to be its maintained successor, but would require a bit more boiler plate around it.

We could also rely more on pipewire for thread safery/synchronization (and sprinkle unsafe to please Rust), but I'm wary of such path: as far as I've seen, pipewire thread safety documentation is scarce and pipewire-rs has unsound holes in its API (https://gitlab.freedesktop.org/pipewire/pipewire-rs/-/issues/103, missing Send (Sync) bounds in its API). This is not just theoretical: I got segmentation faults when I used Rc<RefCell<...>> instead of Arc<Mutex<...>>.

I'll give it more thought/experiment. What do you think?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fundamentally, there is no concurrent usage of the callback, from what I understand. Either the stream is running, and the audio thread uses the callback, or it's paused, and the PipeWire main thread uses the callback to continue event processing. If we can convince Rust of that fact, we can get away with a legal Send and !Sync solution.

I have used atomic_refcell in the past for solutions like this, this one is backed by a single atomic ref-count. Strictly speaking that is still overkill, we could make our own with a simple atomic boolean, but the AtomicRefCell solution would work.

Copy link
Contributor Author

@strohel strohel Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SolarLiner I was able to come up with a way that avoids the Mutex using a trick based on Arc+Weak+Drop+channel and line or two of unsafe, please take a look. See the third commit, which is entirely replaced.

Fundamentally, there is no concurrent usage of the callback, from what I understand. Either the stream is running, and the audio thread uses the callback, or it's paused, and the PipeWire main thread uses the callback to continue event processing. If we can convince Rust of that fact, we can get away with a legal Send and !Sync solution.

If we speak about user_data in https://pipewire.pages.freedesktop.org/pipewire-rs/pipewire/stream/struct.Stream.html#method.add_local_listener_with_user_data (that we use for StreamInner), then I think pipewire will happily call the various callbacks listed in https://pipewire.pages.freedesktop.org/pipewire-rs/pipewire/stream/struct.ListenerLocalBuilder.html concurrently with process() in the RT thread.

It would be most convenient if pipewire guaranteed that process() is only called after state_changed(Streaming) and before state_changed(Paused|Unconnected|..) and never concurrently with them. But I haven't seen such a guarantee documented (or implied) anywhere.

Attempt to fix SolarLiner#105

v2: don't use a `Mutex`, do some Arc/Weak/Drop/channel/unsave tricks instead.
@strohel strohel force-pushed the pipewire-fix-stopped-stream-eject branch from 1bfd825 to 1359359 Compare November 23, 2025 20:45
@strohel strohel requested a review from SolarLiner November 23, 2025 21:43
@strohel
Copy link
Contributor Author

strohel commented Nov 28, 2025

I just learned about this pipewire property:

  • node.always-process = false
    When the node is active, it will always be joined with a driver node, even when nothing is linked to the node. Setting this property to true also implies node.want-driver = true.
    This is the default for JACK nodes, that always need their process callback called.

So in theory setting this property could fix the issue without the complexity of this PR (but with some side-effects). I'll give it a try.

@mbernat
Copy link
Contributor

mbernat commented Dec 4, 2025

@strohel Were you able to try node.always-process in the end?

@strohel
Copy link
Contributor Author

strohel commented Dec 11, 2025

@strohel Were you able to try node.always-process in the end?

I haven't tried yet, will do once I find time. I think it will also break our "callback not called" detection, but that's not something the upstream should be concerned about.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

pipewire: cannot eject (drop) disconnected stream

3 participants