Skip to content

Commit ce4190d

Browse files
committed
implemented per-instrument lock on the server such that each instrument can only be called from one thread at a time.
- also made a better demo for the usage of threaded server
1 parent f9753ea commit ce4190d

File tree

3 files changed

+141
-66
lines changed

3 files changed

+141
-66
lines changed

instrumentserver/server/core.py

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ def __init__(self,
169169
self._wakeup_r, self._wakeup_w = socket.socketpair()
170170
self._wakeup_r.setblocking(False)
171171
self._wakeup_w.setblocking(False)
172+
173+
# Per-instrument locks to avoid races when multiple threads talk to the same instrument concurrently
174+
self._instrument_locks: dict[str, threading.RLock] = {}
175+
self._instrument_locks_lock = threading.Lock()
172176

173177
def _runInitScript(self):
174178
if os.path.exists(self.initScript):
@@ -407,40 +411,60 @@ def _createInstrument(self, spec: InstrumentCreationSpec) -> None:
407411

408412
args = [] if spec.args is None else spec.args
409413
kwargs = dict() if spec.kwargs is None else spec.kwargs
410-
411-
new_instrument = qc.find_or_create_instrument(
412-
cls, spec.name, *args, **kwargs)
413-
if new_instrument.name not in self.station.components:
414-
self.station.add_component(new_instrument)
415-
416-
self.instrumentCreated.emit(bluePrintFromInstrumentModule(new_instrument.name, new_instrument),
417-
args, kwargs)
414+
415+
# lock based on the intended instrument name
416+
lock = self._get_lock_for_target(spec.name)
417+
if lock is None:
418+
# in case name isn't in station yet, just guard creation with the dict lock
419+
lock = self._instrument_locks_lock # coarse but fine for this rare operation
420+
421+
with lock:
422+
new_instrument = qc.find_or_create_instrument(
423+
cls, spec.name, *args, **kwargs)
424+
425+
if new_instrument.name not in self.station.components:
426+
self.station.add_component(new_instrument)
427+
428+
self.instrumentCreated.emit(bluePrintFromInstrumentModule(new_instrument.name, new_instrument),
429+
args, kwargs)
418430

419431
def _callObject(self, spec: CallSpec) -> Any:
420432
"""Call some callable found in the station."""
421433
obj = nestedAttributeFromString(self.station, spec.target)
422434
args = spec.args if spec.args is not None else []
423435
kwargs = spec.kwargs if spec.kwargs is not None else {}
424-
ret = obj(*args, **kwargs)
425-
426-
# Check if a new parameter is being created.
427-
self._newOrDeleteParameterDetection(spec, args, kwargs)
428-
429-
if isinstance(obj, Parameter):
430-
if len(args) > 0:
431-
self.parameterSet.emit(spec.target, args[0])
432-
433-
# Broadcast changes in parameter values.
434-
self._broadcastParameterChange(ParameterBroadcastBluePrint(spec.target, 'parameter-update', args[0]))
436+
437+
def _invoke():
438+
ret = obj(*args, **kwargs)
439+
440+
# Check if a new parameter is being created.
441+
self._newOrDeleteParameterDetection(spec, args, kwargs)
442+
443+
if isinstance(obj, Parameter):
444+
if len(args) > 0:
445+
self.parameterSet.emit(spec.target, args[0])
446+
447+
# Broadcast changes in parameter values.
448+
self._broadcastParameterChange(ParameterBroadcastBluePrint(spec.target, 'parameter-update', args[0]))
449+
else:
450+
self.parameterGet.emit(spec.target, ret)
451+
452+
# Broadcast calls of parameters.
453+
self._broadcastParameterChange(ParameterBroadcastBluePrint(spec.target, 'parameter-call', ret))
435454
else:
436-
self.parameterGet.emit(spec.target, ret)
437-
438-
# Broadcast calls of parameters.
439-
self._broadcastParameterChange(ParameterBroadcastBluePrint(spec.target, 'parameter-call', ret))
455+
self.funcCalled.emit(spec.target, args, kwargs, ret)
456+
457+
return ret
458+
459+
# Get the appropriate per-instrument lock, if any
460+
lock = self._get_lock_for_target(spec.target)
461+
if lock is None:
462+
# Not an instrument (e.g. Station-level call); just invoke
463+
return _invoke()
440464
else:
441-
self.funcCalled.emit(spec.target, args, kwargs, ret)
442-
443-
return ret
465+
# Serialize access to this instrument across threads
466+
with lock:
467+
return _invoke()
444468

445469
def _getBluePrint(self, path: str) -> Union[InstrumentModuleBluePrint,
446470
ParameterBluePrint,
@@ -534,7 +558,32 @@ def _newOrDeleteParameterDetection(self, spec, args, kwargs):
534558
pb = ParameterBroadcastBluePrint(name,
535559
'parameter-deletion')
536560
self._broadcastParameterChange(pb)
537-
561+
562+
def _get_lock_for_target(self, target: str) -> Optional[threading.RLock]:
563+
"""
564+
Given a call target like 'dac1.ch1.offset' or 'awg.ch2.set_sq_wave',
565+
return a per-instrument lock if the root is one of the station components.
566+
Otherwise, return None (no locking needed).
567+
"""
568+
# todo: here we assume each instrument can only be used by one thread at a time, which is generally the safer option.
569+
# There might exists hardware that actually supports independent, concurrent control of different channels,
570+
# in which case we might want to add a tag to the instrument and disable the locking here.
571+
if not target:
572+
return None
573+
574+
# First token before the first dot: assumed to be instrument name
575+
root = target.split('.')[0]
576+
577+
# Only lock if this actually corresponds to an instrument in the station
578+
if root not in self.station.components:
579+
return None
580+
581+
with self._instrument_locks_lock:
582+
lock = self._instrument_locks.get(root)
583+
if lock is None:
584+
lock = threading.RLock()
585+
self._instrument_locks[root] = lock
586+
return lock
538587

539588
def startServer(port: int = 5555,
540589
allowUserShutdown: bool = False,
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from instrumentserver.client import Client
2+
import sys
3+
import time
4+
5+
'''
6+
Simple concurrency demo.
7+
8+
Usage (server already running):
9+
10+
Terminal A (long-running call on dummy1):
11+
python demo_concurrency.py ramp
12+
13+
Terminal B (start while A is still running):
14+
15+
# Case 1: same instrument -> should block behind ramp
16+
python demo_concurrency.py same
17+
18+
# Case 2: different instrument -> should return immediately
19+
python demo_concurrency.py other
20+
21+
22+
23+
This mimics the case when one client is ramping bias voltage, while another client wants to change a parameter of
24+
a different instrument. Or more commonly, a client is ramping bias voltage, and we want to view parameter of an instrument
25+
in the server gui (which also is basically another client that runs in a different thread.)
26+
'''
27+
28+
if __name__ == "__main__":
29+
role = sys.argv[1] if len(sys.argv) > 1 else "ramp"
30+
print(f"[demo] role = {role}")
31+
32+
cli = Client(timeout=50, port=5555)
33+
34+
# We only create what we need for the role, but this is cheap anyway
35+
dummy1 = cli.find_or_create_instrument(
36+
"test1",
37+
"instrumentserver.testing.dummy_instruments.generic.DummyInstrumentTimeout",
38+
)
39+
dummy2 = cli.find_or_create_instrument(
40+
"test2",
41+
"instrumentserver.testing.dummy_instruments.generic.DummyInstrumentTimeout",
42+
)
43+
44+
t0 = time.time()
45+
46+
if role == "ramp": # within a single process, operations are always blocking
47+
print("[ramp] dummy1.get_random_timeout(10)")
48+
print(dummy1.get_random_timeout(10))
49+
print("[after ramp] dummy2.get_random()")
50+
print(dummy2.get_random())
51+
52+
elif role == "same": # from a different process, operations on the same instrument are still blocked
53+
print("[same] dummy1.get_random() (same instrument as ramp)")
54+
print(dummy1.get_random())
55+
56+
elif role == "other": # from a different process, operations on a different instrument are NOT blocked
57+
print("[other] dummy2.get_random() (different instrument)")
58+
print(dummy2.get_random())
59+
60+
else:
61+
print(f"Unknown role {role!r}. Use 'ramp', 'same', or 'other'.")
62+
63+
print(f"[{role}] took {time.time() - t0:.3f} s")
64+
65+

instrumentserver/testing/test_async_requests/test_client.py

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)