Skip to content

Commit e9a76f0

Browse files
Merge pull request #198 from ezmsg-org/griff/working
Backend Rework: Message Channels
2 parents b7023ea + 66bdd6d commit e9a76f0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+3979
-1604
lines changed

.git-blame-ignore-revs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
5b4c220154d33cac15a75d3d7d978a75e67f2b8a
33
# Ran `black` formatter on entire codebase and fixed non-standard line-endings with `dos2unix`.
44
4e4f20be40a73f2162a565732bae76ea0c812739
5+
# chore: ruff formatting
6+
5ca5711e7714042f23d1286abf946217d75318c2

docs/source/conf.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Configuration file for the Sphinx documentation builder.
22

33
import os
4-
import sys
54

65
# -- Project information --------------------------
76

@@ -75,7 +74,7 @@
7574
html_static_path = ["_static"]
7675

7776
# Timestamp is inserted at every page bottom in this strftime format.
78-
html_last_updated_fmt = '%Y-%m-%d'
77+
html_last_updated_fmt = "%Y-%m-%d"
7978

8079
# -- Options for EPUB output --------------------------
8180
epub_show_urls = "footnote"
@@ -95,8 +94,10 @@ def linkcode_resolve(domain, info):
9594
else:
9695
return f"{code_url}src/{filename}.py"
9796

97+
9898
# -- Options for graphviz -----------------------------
9999
graphviz_output_format = "svg"
100100

101+
101102
def setup(app):
102-
app.add_css_file("custom.css")
103+
app.add_css_file("custom.css")

examples/ezmsg_attach.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
if __name__ == "__main__":
66
print("This example attaches to the system created/run by ezmsg_toy.py.")
77
log = DebugLog()
8-
ez.run(log, connections=(("TestSystem/PING/OUTPUT", log.INPUT),))
8+
ez.run(LOG=log, connections=(("GLOBAL_PING_TOPIC", log.INPUT),))

examples/ezmsg_configs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,5 +239,5 @@ def network(self) -> ez.NetworkDefinition:
239239
]
240240

241241
for system in test_systems:
242-
ez.logger.info(f"Testing { system.__name__ }")
242+
ez.logger.info(f"Testing {system.__name__}")
243243
ez.run(system())

examples/ezmsg_toy.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ def process_components(self):
192192

193193
ez.run(
194194
SYSTEM=system,
195-
# connections = [
196-
# ( system.PING.OUTPUT, 'PING_OUTPUT' ),
197-
# ( 'FOO_SUB', system.FOOSUB.INPUT )
198-
# ]
195+
connections=[
196+
# Make PING.OUTPUT available on a topic ezmsg_attach.py
197+
(system.PING.OUTPUT, "GLOBAL_PING_TOPIC"),
198+
],
199199
)

examples/lowlevel_api.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import asyncio
2+
3+
import ezmsg.core as ez
4+
5+
PORT = 12345
6+
MAX_COUNT = 100
7+
TOPIC = "/TEST"
8+
9+
10+
async def handle_pub(pub: ez.Publisher) -> None:
11+
print("Publisher Task Launched")
12+
13+
count = 0
14+
15+
while True:
16+
await pub.broadcast(f"{count=}")
17+
await asyncio.sleep(0.1)
18+
count += 1
19+
if count >= MAX_COUNT:
20+
break
21+
22+
print("Publisher Task Concluded")
23+
24+
25+
async def handle_sub(sub: ez.Subscriber) -> None:
26+
print("Subscriber Task Launched")
27+
28+
rx_count = 0
29+
while True:
30+
async with sub.recv_zero_copy() as msg:
31+
# Uncomment if you want to witness backpressure!
32+
# await asyncio.sleep(0.15)
33+
print(msg)
34+
35+
rx_count += 1
36+
if rx_count >= MAX_COUNT:
37+
break
38+
39+
print("Subscriber Task Concluded")
40+
41+
42+
async def host(host: str = "127.0.0.1"):
43+
# Manually create a GraphServer
44+
server = ez.GraphServer()
45+
server.start((host, PORT))
46+
47+
print(f"Created GraphServer @ {server.address}")
48+
49+
try:
50+
test_pub = await ez.Publisher.create(TOPIC, (host, PORT), host=host)
51+
test_sub1 = await ez.Subscriber.create(TOPIC, (host, PORT))
52+
test_sub2 = await ez.Subscriber.create(TOPIC, (host, PORT))
53+
54+
await asyncio.sleep(1.0)
55+
56+
pub_task = asyncio.Task(handle_pub(test_pub))
57+
sub_task_1 = asyncio.Task(handle_sub(test_sub1))
58+
sub_task_2 = asyncio.Task(handle_sub(test_sub2))
59+
60+
await asyncio.wait([pub_task, sub_task_1, sub_task_2])
61+
62+
test_pub.close()
63+
test_sub1.close()
64+
test_sub2.close()
65+
66+
for future in asyncio.as_completed(
67+
[
68+
test_pub.wait_closed(),
69+
test_sub1.wait_closed(),
70+
test_sub2.wait_closed(),
71+
]
72+
):
73+
await future
74+
75+
finally:
76+
server.stop()
77+
78+
print("Done")
79+
80+
81+
async def attach_client(host: str = "127.0.0.1"):
82+
83+
sub = await ez.Subscriber.create(TOPIC, (host, PORT))
84+
85+
try:
86+
while True:
87+
async with sub.recv_zero_copy() as msg:
88+
# Uncomment if you want to see EXTREME backpressure!
89+
# await asyncio.sleep(1.0)
90+
print(msg)
91+
92+
except asyncio.CancelledError:
93+
pass
94+
95+
finally:
96+
sub.close()
97+
await sub.wait_closed()
98+
print("Detached")
99+
100+
101+
if __name__ == "__main__":
102+
from dataclasses import dataclass
103+
from argparse import ArgumentParser
104+
105+
parser = ArgumentParser()
106+
parser.add_argument("--attach", action="store_true", help="attach to running graph")
107+
parser.add_argument("--host", default="0.0.0.0", help="hostname for graphserver")
108+
109+
@dataclass
110+
class Args:
111+
attach: bool
112+
host: str
113+
114+
args = Args(**vars(parser.parse_args()))
115+
116+
if args.attach:
117+
asyncio.run(attach_client(host=args.host))
118+
else:
119+
asyncio.run(host(host=args.host))

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dev = [
2424
{include-group = "lint"},
2525
{include-group = "test"},
2626
"pre-commit>=4.3.0",
27+
"viztracer>=1.0.4",
2728
]
2829
lint = [
2930
"flake8>=7.3.0",
@@ -35,6 +36,7 @@ test = [
3536
"pytest-asyncio>=1.1.0",
3637
"pytest-cov>=6.2.1",
3738
"xarray>=2025.6.1",
39+
"psutil>=7.1.0",
3840
]
3941
docs = [
4042
{include-group = "axisarray"},
@@ -51,6 +53,7 @@ axisarray = [
5153

5254
[project.scripts]
5355
ezmsg = "ezmsg.core.command:cmdline"
56+
ezmsg-perf = "ezmsg.util.perf.command:command"
5457

5558
[project.optional-dependencies]
5659
axisarray = [

src/ezmsg/core/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
"GraphServer",
2525
"GraphContext",
2626
"run_command",
27+
"Publisher",
28+
"Subscriber",
2729
# All following are deprecated
2830
"System",
2931
"run_system",
@@ -42,6 +44,8 @@
4244
from .graphserver import GraphServer
4345
from .graphcontext import GraphContext
4446
from .command import run_command
47+
from .pubclient import Publisher
48+
from .subclient import Subscriber
4549

4650
# Following imports are deprecated
4751
from .backend import run_system

src/ezmsg/core/addressable.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@
44
class Addressable:
55
"""
66
Base class for objects that can be addressed within the ezmsg system.
7-
8-
Addressable objects have a hierarchical address structure consisting of
7+
8+
Addressable objects have a hierarchical address structure consisting of
99
a location path and a name, similar to a filesystem path.
1010
"""
11+
1112
_name: str | None
1213
_location: list[str] | None
1314

1415
def __init__(self) -> None:
1516
"""
1617
Initialize an Addressable object.
17-
18+
1819
The name and location are initially None and must be set before
1920
the object can be properly addressed. This is achieved through
2021
the ``_set_name()`` and ``_set_location()`` methods.
@@ -38,7 +39,7 @@ def _set_location(self, location: list[str] | None = None):
3839
def name(self) -> str:
3940
"""
4041
Get the name of this addressable object.
41-
42+
4243
:return: The object's name
4344
:rtype: str
4445
:raises AssertionError: If name has not been set
@@ -51,7 +52,7 @@ def name(self) -> str:
5152
def location(self) -> list[str]:
5253
"""
5354
Get the location path of this addressable object.
54-
55+
5556
:return: List of path components representing the object's location
5657
:rtype: list[str]
5758
:raises AssertionError: If location has not been set
@@ -64,10 +65,10 @@ def location(self) -> list[str]:
6465
def address(self) -> str:
6566
"""
6667
Get the full address of this object.
67-
68+
6869
The address is constructed by joining the location path and name
6970
with forward slashes, similar to a filesystem path.
70-
71+
7172
:return: The full address string
7273
:rtype: str
7374
"""

0 commit comments

Comments
 (0)