Skip to content

Conversation

@griffinmilsap
Copy link
Collaborator

@griffinmilsap griffinmilsap commented Sep 17, 2025

Message Channel Backend Rework

Summary

This PR introduces a major rework of the ezmsg Pub/Sub backend, replacing the direct publisher→subscriber TCP link model with an intermediary Message Channel abstraction.

The new design substantially reduces redundant serialization between processes, improves fan-out efficiency, and introduces a unified point for telemetry, profiling, and performance tracking.


Motivation

In the current (dev) branch of ezmsg:

  • Each Publisher establishes direct TCP connections to each Subscriber.
  • When multiple subscribers in the same process subscribe to the same publisher, each message is serialized and transmitted multiple times to that process.
  • This leads to redundant (de)serialization and avoidable CPU overhead — especially in high-throughput fan-out configurations.

Example: Redundant Serialization (Current Backend)

flowchart LR
  subgraph ProcessA
    P[Publisher]
  end

  subgraph ProcessB
    S1[Subscriber 1]
    S2[Subscriber 2]
  end

  P -- "TCP Serialized Message #1" --> S1
  P -- "TCP Serialized Message #2" --> S2
Loading

This design scales poorly as the number of subscribers per process increases.


Solution: Message Channels

This PR introduces Message Channels, lightweight intermediaries that handle message fan-out and deserialization per process, not per subscriber.

  • Each Publisher connects to one or more Message Channels (via shared memory, TCP, or locally).
  • Each Message Channel manages the flow of messages to multiple subscribers within the same process.
  • Messages are serialized/deserialized once per process, then distributed to all local subscribers.
  • A global ChannelRegistry (similar to the prior message cache) manages available channels and their endpoints.

Optimized Flow (New Backend)

flowchart LR
  subgraph ProcessA
    P[Publisher]
  end

  subgraph ProcessB
    CH[Message Channel]
    S1[Subscriber 1]
    S2[Subscriber 2]
  end

  P -- "Serialized Once" --> CH
  CH -- "Shared Message" --> S1
  CH -- "Shared Message" --> S2
Loading

Benefits

  • 🚀 Major fan-out performance improvement — single deserialization per process rather than per subscriber.
  • 🧰 Unified telemetry and profiling hooks — channels expose metrics for throughput, latency, and subscriber timing.
  • 🧩 Cleaner layering & extensibility — clear API surface for alternate transports and profilers.

Performance Validation

All benchmarks were executed on Apple Silicon (M3 Pro, 14 cores, 48 GB RAM) using Python 3.11.11 and ezmsg 3.6.2.
Metrics compare the new Message Channel backend (this PR) against the previous direct pub/sub backend (dev branch).

Configuration Transport Test Type Key Observation
Fan-In (many pubs → one sub) Local / SHM Consolidation throughput up significantly; lower mean latency ✅ +15–30 %
Fan-Out (one pub → many subs) Local / SHM Major improvement; single deserialization per process yields far higher fan-out rates ✅ +200–400 %
Fan-Out TCP Slight regression due to added channel queueing; within expected bounds ⚠️ −5–10 %
Relay (pub → relay → sub) Local / SHM Clear throughput/latency gains; reduced per-hop cost ✅ +20–60 %
Relay TCP Generally improved except at very high payloads; minor queueing overhead ⚠️ ±10 %
Spread (multi-proc) SHM / TCP Performance parity or modest gains; scaling stable up to 16 clients ⚖️ ≈ 0–15 %
Overall Aggregate sample/data rates increased in 9 of 12 test groups 🟢 Net positive performance

Highlights

  • 🧩 Fan-out workloads benefit most: single deserialization per process → ~3–4× sample rate increase.
  • ⚙️ Relay and fan-in paths show consistent latency improvements (−30 – 50 %).
  • 🧠 TCP regressions limited to ~10 % due to intentional buffering in message channels.
  • 🧮 Variability below ±10 % considered statistical noise; no significant regressions observed.

Full HTML report (report_griff_working.html) is attached to this PR for complete metrics, plots, and configuration details.


Summary of Impact

Area Change Impact
Core Pub/Sub Direct pub→sub → pub→channel→sub ✅ Reduced serialization overhead
Performance Fan-out and local efficiency 🚀 Significant improvement
Compatibility Publisher/Subscriber API surface unchanged ⚙️ Transparent to users

References

This backend rework modernizes ezmsg’s core dataflow architecture and lays the groundwork for future enhancements in telemetry, profiling, and adaptive scheduling.

@griffinmilsap griffinmilsap marked this pull request as ready for review November 17, 2025 16:24
@griffinmilsap
Copy link
Collaborator Author

Also wanted to mention that this PR includes #199 and obsolesces #189.

@griffinmilsap
Copy link
Collaborator Author

griffinmilsap commented Nov 17, 2025

Unit tests for Channel, ChannelManager and MessageCache would be a very valuable contribution to this particular PR. I have a laundry list of refactors/organizational/maintainence fixes I'd like to implement, but I'm unsure if they should also be a part of this (already massive) PR. Happy to put this back into draft to get these unit tests written, and optionally accomplish the following:

TODO:

  • Ruff formatting
  • Refactor MessageCache to its own file
  • Refactor ChannelManager to its own file
  • Remove ServiceManager and roll it into GraphService
  • Remove ThreadedAsyncServer and roll it into GraphServer
  • Docstrings for Channel and ChannelManager
  • Expose Publisher, Subscriber, and GraphContext in at the top-level module (ez.Publisher, ez.GraphContext, etc)
  • GitHub Pages docs covering the low-level Publisher/Subscriber API
  • GitHub Pages docs covering message transport with sequence diagrams

Additionally, it would be valuable to implement all of the StreamReader/StreamWriter stuff with asyncio.Protocols (like Preston did in fix/comms-protocols) even if/when they result in small regressions for the current perf suite. It should be noted that the protocol implementation would make batch send/receive much more efficient and the default perf-tests only test num_buffers=1.

I'd like all of this to be done before moving on to a separate PRs/enhancements for:

  • Telemetry via GraphServer supporting system profiling
  • Re-implementing the existing comms between all GraphClients and the GraphServer, as well as the Publisher/Channel into asyncio.Protocols.
  • Refactor the ezmsg command line with subparsers to build perf testing into the command line directly ezmsg perf instead of ezmsg-perf

@griffinmilsap
Copy link
Collaborator Author

griffinmilsap commented Nov 20, 2025

Okay, I think this is ready for merge into dev. @cboulay @KonradPilch lmk if you feel the same way and I'll merge.

Copy link
Contributor

@KonradPilch KonradPilch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, AMAZING work. Thanks @griffinmilsap.

Just leaving some comments, most are minor.

Very happy with the changes. If you think this is in a state you're happy with @griffinmilsap, I'd be ok with merging once you have looked at my comments and decided what does and doesn't need actioning.

@KonradPilch
Copy link
Contributor

KonradPilch commented Nov 24, 2025

GitHub Pages docs covering the low-level Publisher/Subscriber API
GitHub Pages docs covering message transport with sequence diagrams

Also @griffinmilsap, feel free to add an explanation of how the backend works now in the docs repo. I feel like the explanation you've provided in this PR is exactly what would work best there. If you want to move onto the list of further work you've got listed above, let me know and I can put the docs work on my todo instead.

@griffinmilsap
Copy link
Collaborator Author

First, AMAZING work. Thanks @griffinmilsap.

Just leaving some comments, most are minor.

Very happy with the changes. If you think this is in a state you're happy with @griffinmilsap, I'd be ok with merging once you have looked at my comments and decided what does and doesn't need actioning.

You are a gentleman and a scholar, huge thanks for looking over this absolutely massive PR. Going through your comments one by one now; thanks!!!

Copy link
Contributor

@KonradPilch KonradPilch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks good to me now!

@griffinmilsap griffinmilsap merged commit e9a76f0 into dev Nov 25, 2025
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants