From 34c9a21925c29a878d586ab67ae9650fbb7622b3 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Tue, 30 Sep 2025 09:43:41 +0200 Subject: [PATCH 1/6] Wip jinja2 generate --- .../stanford_research/SR86x.jinja | 652 +++++++++++++++ .../stanford_research/SR86x.py | 774 ++---------------- .../stanford_research/_SR86x_modules.py | 665 +++++++++++++++ .../stanford_research/generate_code.py | 14 + 4 files changed, 1412 insertions(+), 693 deletions(-) create mode 100644 src/qcodes/instrument_drivers/stanford_research/SR86x.jinja create mode 100644 src/qcodes/instrument_drivers/stanford_research/_SR86x_modules.py create mode 100644 src/qcodes/instrument_drivers/stanford_research/generate_code.py diff --git a/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja b/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja new file mode 100644 index 000000000000..772a3dbaef57 --- /dev/null +++ b/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja @@ -0,0 +1,652 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar + +from qcodes.instrument import ( + ChannelList, + ChannelTuple, + VisaInstrument, + VisaInstrumentKWArgs, +) +from qcodes.validators import ComplexNumbers, Enum, Ints, Numbers + +from ._SR86x_modules import SR86xBuffer, SR86xDataChannel + +if TYPE_CHECKING: + from typing_extensions import Unpack + + from qcodes.parameters import Parameter + +log = logging.getLogger(__name__) + + +class SR86x(VisaInstrument): + """ + Base class for Stanford SR86x Lock-in Amplifier drivers. This class should not + be instantiated directly instead one of the model specific sub classes should be used. + """ + + _VOLT_TO_N: ClassVar[dict[int | float, int]] = { + 1: 0, + 500e-3: 1, + 200e-3: 2, + 100e-3: 3, + 50e-3: 4, + 20e-3: 5, + 10e-3: 6, + 5e-3: 7, + 2e-3: 8, + 1e-3: 9, + 500e-6: 10, + 200e-6: 11, + 100e-6: 12, + 50e-6: 13, + 20e-6: 14, + 10e-6: 15, + 5e-6: 16, + 2e-6: 17, + 1e-6: 18, + 500e-9: 19, + 200e-9: 20, + 100e-9: 21, + 50e-9: 22, + 20e-9: 23, + 10e-9: 24, + 5e-9: 25, + 2e-9: 26, + 1e-9: 27, + } + _N_TO_VOLT: ClassVar[dict[int, int | float]] = {v: k for k, v in _VOLT_TO_N.items()} + + _CURR_TO_N: ClassVar[dict[float, int]] = { + 1e-6: 0, + 500e-9: 1, + 200e-9: 2, + 100e-9: 3, + 50e-9: 4, + 20e-9: 5, + 10e-9: 6, + 5e-9: 7, + 2e-9: 8, + 1e-9: 9, + 500e-12: 10, + 200e-12: 11, + 100e-12: 12, + 50e-12: 13, + 20e-12: 14, + 10e-12: 15, + 5e-12: 16, + 2e-12: 17, + 1e-12: 18, + 500e-15: 19, + 200e-15: 20, + 100e-15: 21, + 50e-15: 22, + 20e-15: 23, + 10e-15: 24, + 5e-15: 25, + 2e-15: 26, + 1e-15: 27, + } + _N_TO_CURR: ClassVar[dict[int, float]] = {v: k for k, v in _CURR_TO_N.items()} + + _VOLT_ENUM = Enum(*_VOLT_TO_N.keys()) + _CURR_ENUM = Enum(*_CURR_TO_N.keys()) + + _INPUT_SIGNAL_TO_N: ClassVar[dict[str, int]] = { + "voltage": 0, + "current": 1, + } + _N_TO_INPUT_SIGNAL: ClassVar[dict[int, str]] = { + v: k for k, v in _INPUT_SIGNAL_TO_N.items() + } + + PARAMETER_NAMES: ClassVar[dict[str, str]] = { + "X": "0", # X output, 'X' + "Y": "1", # Y output, 'Y' + "R": "2", # R output, 'R' + "P": "3", # theta output, 'THeta' + "aux_in1": "4", # Aux In 1, 'IN1' + "aux_in2": "5", # Aux In 2, 'IN2' + "aux_in3": "6", # Aux In 3, 'IN3' + "aux_in4": "7", # Aux In 4, 'IN4' + "Xnoise": "8", # X noise, 'XNOise' + "Ynoise": "9", # Y noise, 'YNOise' + "aux_out1": "10", # Aux Out 1, 'OUT1' + "aux_out2": "11", # Aux Out 2, 'OUT2' + "phase": "12", # Reference Phase, 'PHAse' + "amplitude": "13", # Sine Out Amplitude, 'SAMp' + "sine_outdc": "14", # DC Level, 'LEVel' + "frequency": "15", # Int. Ref. Frequency, 'FInt' + "frequency_ext": "16", # Ext. Ref. Frequency, 'FExt' + } + + _N_DATA_CHANNELS = 4 + + default_terminator = "\n" + + def __init__( + self, + name: str, + address: str, + max_frequency: float, + reset: bool = False, + **kwargs: Unpack[VisaInstrumentKWArgs], + ): + super().__init__(name, address, **kwargs) + self._max_frequency = max_frequency + # Reference commands + self.frequency: Parameter = self.add_parameter( + name="frequency", + label="Frequency", + unit="Hz", + get_cmd="FREQ?", + set_cmd="FREQ {}", + get_parser=float, + vals=Numbers(min_value=1e-3, max_value=self._max_frequency), + ) + """Parameter frequency""" + self.sine_outdc: Parameter = self.add_parameter( + name="sine_outdc", + label="Sine out dc level", + unit="V", + get_cmd="SOFF?", + set_cmd="SOFF {}", + get_parser=float, + vals=Numbers(min_value=-5, max_value=5), + ) + """Parameter sine_outdc""" + self.amplitude: Parameter = self.add_parameter( + name="amplitude", + label="Amplitude", + unit="V", + get_cmd="SLVL?", + set_cmd="SLVL {}", + get_parser=float, + vals=Numbers(min_value=0, max_value=2), + ) + """Parameter amplitude""" + self.harmonic: Parameter = self.add_parameter( + name="harmonic", + label="Harmonic", + get_cmd="HARM?", + get_parser=int, + set_cmd="HARM {:d}", + vals=Ints(min_value=1, max_value=99), + ) + """Parameter harmonic""" + self.phase: Parameter = self.add_parameter( + name="phase", + label="Phase", + unit="deg", + get_cmd="PHAS?", + set_cmd="PHAS {}", + get_parser=float, + vals=Numbers(min_value=-3.6e5, max_value=3.6e5), + ) + """Parameter phase""" + # Signal commands + self.sensitivity: Parameter = self.add_parameter( + name="sensitivity", + label="Sensitivity", + get_cmd="SCAL?", + set_cmd="SCAL {:d}", + get_parser=self._get_sensitivity, + set_parser=self._set_sensitivity, + ) + """Parameter sensitivity""" + self.filter_slope: Parameter = self.add_parameter( + name="filter_slope", + label="Filter slope", + unit="dB/oct", + get_cmd="OFSL?", + set_cmd="OFSL {}", + val_mapping={6: 0, 12: 1, 18: 2, 24: 3}, + ) + """Parameter filter_slope""" + self.sync_filter: Parameter = self.add_parameter( + name="sync_filter", + label="Sync filter", + get_cmd="SYNC?", + set_cmd="SYNC {}", + val_mapping={"OFF": 0, "ON": 1}, + ) + """Parameter sync_filter""" + self.noise_bandwidth: Parameter = self.add_parameter( + name="noise_bandwidth", + label="Noise bandwidth", + unit="Hz", + get_cmd="ENBW?", + get_parser=float, + ) + """Parameter noise_bandwidth""" + self.signal_strength: Parameter = self.add_parameter( + name="signal_strength", + label="Signal strength indicator", + get_cmd="ILVL?", + get_parser=int, + ) + """Parameter signal_strength""" + self.signal_input: Parameter = self.add_parameter( + name="signal_input", + label="Signal input", + get_cmd="IVMD?", + get_parser=self._get_input_config, + set_cmd="IVMD {}", + set_parser=self._set_input_config, + vals=Enum(*self._INPUT_SIGNAL_TO_N.keys()), + ) + """Parameter signal_input""" + self.input_range: Parameter = self.add_parameter( + name="input_range", + label="Input range", + unit="V", + get_cmd="IRNG?", + set_cmd="IRNG {}", + val_mapping={1: 0, 300e-3: 1, 100e-3: 2, 30e-3: 3, 10e-3: 4}, + ) + """Parameter input_range""" + self.input_config: Parameter = self.add_parameter( + name="input_config", + label="Input configuration", + get_cmd="ISRC?", + set_cmd="ISRC {}", + val_mapping={"a": 0, "a-b": 1}, + ) + """Parameter input_config""" + self.input_shield: Parameter = self.add_parameter( + name="input_shield", + label="Input shield", + get_cmd="IGND?", + set_cmd="IGND {}", + val_mapping={"float": 0, "ground": 1}, + ) + """Parameter input_shield""" + self.input_gain: Parameter = self.add_parameter( + name="input_gain", + label="Input gain", + unit="ohm", + get_cmd="ICUR?", + set_cmd="ICUR {}", + val_mapping={1e6: 0, 100e6: 1}, + ) + """Parameter input_gain""" + self.adv_filter: Parameter = self.add_parameter( + name="adv_filter", + label="Advanced filter", + get_cmd="ADVFILT?", + set_cmd="ADVFILT {}", + val_mapping={"OFF": 0, "ON": 1}, + ) + """Parameter adv_filter""" + self.input_coupling: Parameter = self.add_parameter( + name="input_coupling", + label="Input coupling", + get_cmd="ICPL?", + set_cmd="ICPL {}", + val_mapping={"ac": 0, "dc": 1}, + ) + """Parameter input_coupling""" + self.time_constant: Parameter = self.add_parameter( + name="time_constant", + label="Time constant", + unit="s", + get_cmd="OFLT?", + set_cmd="OFLT {}", + val_mapping={ + 1e-6: 0, + 3e-6: 1, + 10e-6: 2, + 30e-6: 3, + 100e-6: 4, + 300e-6: 5, + 1e-3: 6, + 3e-3: 7, + 10e-3: 8, + 30e-3: 9, + 100e-3: 10, + 300e-3: 11, + 1: 12, + 3: 13, + 10: 14, + 30: 15, + 100: 16, + 300: 17, + 1e3: 18, + 3e3: 19, + 10e3: 20, + 30e3: 21, + }, + ) + """Parameter time_constant""" + + self.external_reference_trigger: Parameter = self.add_parameter( + name="external_reference_trigger", + label="External reference trigger mode", + get_cmd="RTRG?", + set_cmd="RTRG {}", + val_mapping={ + "SIN": 0, + "POS": 1, + "POSTTL": 1, + "NEG": 2, + "NEGTTL": 2, + }, + docstring="The triggering mode for synchronization of the " + "internal reference signal with the externally provided " + "one", + ) + """The triggering mode for synchronization of the internal reference signal with the externally provided one""" + + self.reference_source: Parameter = self.add_parameter( + name="reference_source", + label="Reference source", + get_cmd="RSRC?", + set_cmd="RSRC {}", + val_mapping={"INT": 0, "EXT": 1, "DUAL": 2, "CHOP": 3}, + docstring="The source of the reference signal", + ) + """The source of the reference signal""" + + self.external_reference_trigger_input_resistance: Parameter = ( + self.add_parameter( + name="external_reference_trigger_input_resistance", + label="External reference trigger input resistance", + get_cmd="REFZ?", + set_cmd="REFZ {}", + val_mapping={ + "50": 0, + "50OHMS": 0, + 0: 0, + "1M": 1, + "1MEG": 1, + 1: 1, + }, + docstring="Input resistance of the input for the external " + "reference signal", + ) + ) + """Input resistance of the input for the external reference signal""" + + # Auto functions + self.add_function("auto_range", call_cmd="ARNG") + self.add_function("auto_scale", call_cmd="ASCL") + self.add_function("auto_phase", call_cmd="APHS") + + # Data transfer + # first 4 parameters from a list of 16 below. + self.X: Parameter = self.add_parameter( + "X", + label="In-phase Magnitude", + get_cmd="OUTP? 0", + get_parser=float, + unit="V", + ) + """Parameter X""" + self.Y: Parameter = self.add_parameter( + "Y", + label="Out-phase Magnitude", + get_cmd="OUTP? 1", + get_parser=float, + unit="V", + ) + """Parameter Y""" + self.R: Parameter = self.add_parameter( + "R", label="Magnitude", get_cmd="OUTP? 2", get_parser=float, unit="V" + ) + """Parameter R""" + self.P: Parameter = self.add_parameter( + "P", label="Phase", get_cmd="OUTP? 3", get_parser=float, unit="deg" + ) + """Parameter P""" + + self.complex_voltage: Parameter = self.add_parameter( + "complex_voltage", + label="Voltage", + get_cmd=self._get_complex_voltage, + unit="V", + vals=ComplexNumbers(), + ) + """Parameter complex_voltage""" + + # CH1/CH2 Output Commands + self.X_offset: Parameter = self.add_parameter( + "X_offset", + label="X offset ", + unit="%", + get_cmd="COFP? 0", + set_cmd="COFP 0, {}", + get_parser=float, + vals=Numbers(min_value=-999.99, max_value=999.99), + ) + """Parameter X_offset""" + self.Y_offset: Parameter = self.add_parameter( + "Y_offset", + label="Y offset", + unit="%", + get_cmd="COFP? 1", + set_cmd="COFP 1, {}", + get_parser=float, + vals=Numbers(min_value=-999.99, max_value=999.99), + ) + """Parameter Y_offset""" + self.R_offset: Parameter = self.add_parameter( + "R_offset", + label="R offset", + unit="%", + get_cmd="COFP? 2", + set_cmd="COFP 2, {}", + get_parser=float, + vals=Numbers(min_value=-999.99, max_value=999.99), + ) + """Parameter R_offset""" + self.X_expand: Parameter = self.add_parameter( + "X_expand", + label="X expand multiplier", + get_cmd="CEXP? 0", + set_cmd="CEXP 0, {}", + val_mapping={"OFF": "0", "X10": "1", "X100": "2"}, + ) + """Parameter X_expand""" + self.Y_expand: Parameter = self.add_parameter( + "Y_expand", + label="Y expand multiplier", + get_cmd="CEXP? 1", + set_cmd="CEXP 1, {}", + val_mapping={"OFF": 0, "X10": 1, "X100": 2}, + ) + """Parameter Y_expand""" + self.R_expand: Parameter = self.add_parameter( + "R_expand", + label="R expand multiplier", + get_cmd="CEXP? 2", + set_cmd="CEXP 2, {}", + val_mapping={"OFF": 0, "X10": 1, "X100": 2}, + ) + """Parameter R_expand""" + + # Aux input/output + {% for i in [0, 1, 2, 3] %} + self.aux_in{{i}} = self.add_parameter( + "aux_in{{i}}", + label="Aux input {{i}}", + get_cmd="OAUX? {{i}}", + get_parser=float, + unit="V", + ) + self.aux_out{{i}} = self.add_parameter( + "aux_out{{i}}", + label="Aux output {{i}}", + get_cmd="AUXV? {{i}}", + get_parser=float, + set_cmd="AUXV {{i}}, {}", + unit="V", + ) + {% endfor %} + + # Data channels: + # 'DAT1' (green), 'DAT2' (blue), 'DAT3' (yellow), 'DAT4' (orange) + data_channels = ChannelList( + self, "data_channels", SR86xDataChannel, snapshotable=False + ) + {% for num, color in [(0, "green"), (1, "blue"), (2, "yellow"), (3, "orange")] %} + + data_channel = SR86xDataChannel(self, "data_channel_{{ num + 1 }}", "{{ num }}", "DAT{{ num + 1 }}", "{{ color }}") + + data_channels.append(data_channel) + self.data_channel_{{ num + 1 }} = self.add_submodule("data_channel_{{ num + 1 }}", data_channel) + {% endfor %} + + self.data_channels: ChannelTuple[SR86xDataChannel] = self.add_submodule( + "data_channels", data_channels.to_channel_tuple() + ) + """Interface for the SR86x data channels""" + + # Interface + self.add_function("reset", call_cmd="*RST") + + self.add_function("disable_front_panel", call_cmd="OVRM 0") + self.add_function("enable_front_panel", call_cmd="OVRM 1") + + buffer = SR86xBuffer(self, f"{self.name}_buffer") + self.buffer: SR86xBuffer = self.add_submodule("buffer", buffer) + """Interface for the SR86x buffer""" + + self.input_config() + self.connect_message() + + def _set_units(self, unit: str) -> None: + for param in [self.X, self.Y, self.R, self.sensitivity]: + param.unit = unit + + def _get_complex_voltage(self) -> complex: + x, y = self.get_values("X", "Y") + return x + 1.0j * y + + def _get_input_config(self, s: int) -> str: + mode = self._N_TO_INPUT_SIGNAL[int(s)] + + if mode == "voltage": + self.sensitivity.vals = self._VOLT_ENUM + self._set_units("V") + else: + self.sensitivity.vals = self._CURR_ENUM + self._set_units("A") + + return mode + + def _set_input_config(self, s: str) -> int: + if s == "voltage": + self.sensitivity.vals = self._VOLT_ENUM + self._set_units("V") + else: + self.sensitivity.vals = self._CURR_ENUM + self._set_units("A") + + return self._INPUT_SIGNAL_TO_N[s] + + def _get_sensitivity(self, s: int) -> float: + if self.signal_input() == "voltage": + return self._N_TO_VOLT[int(s)] + else: + return self._N_TO_CURR[int(s)] + + def _set_sensitivity(self, s: float) -> int: + if self.signal_input() == "voltage": + return self._VOLT_TO_N[s] + else: + return self._CURR_TO_N[s] + + def get_values(self, *parameter_names: str) -> tuple[float, ...]: + """ + Get values of 2 or 3 parameters that are measured by the lock-in + amplifier. These values are guaranteed to come from the same + measurement cycle as opposed to getting values of parameters one by + one (for example, by calling `sr.X()`, and then `sr.Y()`. + + Args: + *parameter_names: 2 or 3 names of parameters for which the values + are requested; valid names can be found in `PARAMETER_NAMES` + attribute of the driver class + + Returns: + a tuple of 2 or 3 floating point values + + """ + if not 2 <= len(parameter_names) <= 3: + raise KeyError( + "It is only possible to request values of 2 or 3 parameters at a time." + ) + + for name in parameter_names: + if name not in self.PARAMETER_NAMES: + raise KeyError( + f"{name} is not a valid parameter name. Refer " + f"to `PARAMETER_NAMES` for a list of valid " + f"parameter names" + ) + + p_ids = [self.PARAMETER_NAMES[name] for name in parameter_names] + output = self.ask(f"SNAP? {','.join(p_ids)}") + return tuple(float(val) for val in output.split(",")) + + def get_data_channels_values(self) -> tuple[float, ...]: + """ + Queries the current values of the data channels + + Returns: + tuple of 4 values of the data channels + + """ + output = self.ask("SNAPD?") + return tuple(float(val) for val in output.split(",")) + + def get_data_channels_parameters( + self, query_instrument: bool = True + ) -> tuple[str, ...]: + """ + Convenience method to query a list of parameters which the data + channels are currently assigned to. + + Args: + query_instrument: If set to False, the internally cashed names of + the parameters will be returned; if True, then the names will + be queried through the instrument + + Returns: + a tuple of 4 strings of parameter names + + """ + if query_instrument: + method_name = "get" + else: + method_name = "get_latest" + + return tuple( + getattr(getattr(self.data_channels[i], "assigned_parameter"), method_name)() + for i in range(self._N_DATA_CHANNELS) + ) + + def get_data_channels_dict(self, requery_names: bool = False) -> dict[str, float]: + """ + Returns a dictionary where the keys are parameter names currently + assigned to the data channels, and values are the values of those + parameters. + + Args: + requery_names: if False, the currently assigned parameter names + will not be queries from the instrument in order to save time + on communication, in this case the cached assigned parameter + names will be used for the keys of the dicitonary; if True, + the assigned parameter names will be queried from the + instrument + + Returns: + a dictionary where keys are names of parameters assigned to the + data channels, and values are the values of those parameters + + """ + parameter_names = self.get_data_channels_parameters(requery_names) + parameter_values = self.get_data_channels_values() + return dict(zip(parameter_names, parameter_values)) diff --git a/src/qcodes/instrument_drivers/stanford_research/SR86x.py b/src/qcodes/instrument_drivers/stanford_research/SR86x.py index a7d7afa50cde..c06203087bdf 100644 --- a/src/qcodes/instrument_drivers/stanford_research/SR86x.py +++ b/src/qcodes/instrument_drivers/stanford_research/SR86x.py @@ -1,25 +1,19 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any, ClassVar - -import numpy as np -import numpy.typing as npt +from typing import TYPE_CHECKING, ClassVar from qcodes.instrument import ( ChannelList, ChannelTuple, - InstrumentBaseKWArgs, - InstrumentChannel, VisaInstrument, VisaInstrumentKWArgs, ) -from qcodes.parameters import ArrayParameter from qcodes.validators import ComplexNumbers, Enum, Ints, Numbers -if TYPE_CHECKING: - from collections.abc import Callable, Sequence +from ._SR86x_modules import SR86xBuffer, SR86xDataChannel +if TYPE_CHECKING: from typing_extensions import Unpack from qcodes.parameters import Parameter @@ -27,665 +21,6 @@ log = logging.getLogger(__name__) -class SR86xBufferReadout(ArrayParameter): - """ - The parameter array that holds read out data. We need this to be compatible - with qcodes.Measure - - Args: - name: Name of the parameter. - instrument: The instrument to add this parameter to. - - """ - - def __init__(self, name: str, instrument: SR86x, **kwargs: Any) -> None: - unit = "deg" - if name in ["X", "Y", "R"]: - unit = "V" - - super().__init__( - name, - shape=(1,), # dummy initial shape - unit=unit, - setpoint_names=("Time",), - setpoint_labels=("Time",), - setpoint_units=("s",), - instrument=instrument, - docstring="Holds an acquired (part of the) data buffer of one channel.", - **kwargs, - ) - - self._capture_data: npt.NDArray | None = None - - def prepare_readout(self, capture_data: npt.NDArray) -> None: - """ - Prepare this parameter for readout. - - Args: - capture_data: The data to capture. - - """ - self._capture_data = capture_data - - data_len = len(capture_data) - self.shape = (data_len,) - self.setpoint_units = ("",) - self.setpoint_names = ("sample_nr",) - self.setpoint_labels = ("Sample number",) - self.setpoints = (tuple(np.arange(0, data_len)),) - - def get_raw(self) -> npt.NDArray: - """ - Public method to access the capture data - """ - if self._capture_data is None: - raise ValueError( - f"Cannot return data for parameter {self.name}. " - f"Please prepare for readout by calling " - f"'get_capture_data' with appropriate " - f"configuration settings" - ) - - return self._capture_data - - -class SR86xBuffer(InstrumentChannel): - """ - Buffer module for the SR86x drivers. - - This driver has been verified to work with the SR860 and SR865. - For reference, please consult the SR860 - manual: http://thinksrs.com/downloads/PDFs/Manuals/SR860m.pdf - """ - - def __init__( - self, parent: SR86x, name: str, **kwargs: Unpack[InstrumentBaseKWArgs] - ) -> None: - super().__init__(parent, name, **kwargs) - - self.capture_length_in_kb: Parameter = self.add_parameter( - "capture_length_in_kb", - label="get/set capture length", - get_cmd="CAPTURELEN?", - set_cmd="CAPTURELEN {}", - set_parser=self._set_capture_len_parser, - get_parser=int, - unit="kB", - ) - """Parameter capture_length_in_kb""" - self.bytes_per_sample = 4 - self.min_capture_length_in_kb = 1 # i.e. minimum buffer size - self.max_capture_length_in_kb = 4096 # i.e. maximum buffer size - # Maximum amount of kB that can be read per single CAPTUREGET command - self.max_size_per_reading_in_kb = 64 - - self.capture_config: Parameter = self.add_parameter( - "capture_config", - label="capture configuration", - get_cmd="CAPTURECFG?", - set_cmd="CAPTURECFG {}", - val_mapping={"X": "0", "X,Y": "1", "R,T": "2", "X,Y,R,T": "3"}, - ) - """Parameter capture_config configures which parameters we want to capture""" - - self.capture_rate_max: Parameter = self.add_parameter( - "capture_rate_max", - label="capture rate maximum", - get_cmd="CAPTURERATEMAX?", - get_parser=float, - ) - """Parameter capture_rate_max""" - - self.capture_rate: Parameter = self.add_parameter( - "capture_rate", - label="capture rate raw", - get_cmd="CAPTURERATE?", - set_cmd="CAPTURERATE {}", - get_parser=float, - set_parser=self._set_capture_rate_parser, - ) - """Parameter capture_rate""" - - max_rate = self.capture_rate_max() - self.available_frequencies = [max_rate / 2**i for i in range(20)] - - self.capture_status: Parameter = self.add_parameter( - "capture_status", label="capture status", get_cmd="CAPTURESTAT?" - ) - """Parameter capture_status: Are we capturing at the moment?""" - - self.count_capture_bytes: Parameter = self.add_parameter( - "count_capture_bytes", - label="captured bytes", - get_cmd="CAPTUREBYTES?", - unit="B", - get_parser=int, - docstring="Number of bytes captured so far in the buffer. Can be " - "used to track live progress.", - ) - """Number of bytes captured so far in the buffer. Can be used to track live progress.""" - - self.count_capture_kilobytes: Parameter = self.add_parameter( - "count_capture_kilobytes", - label="captured kilobytes", - get_cmd="CAPTUREPROG?", - unit="kB", - docstring="Number of kilobytes captured so far in the buffer, " - "rounded-up to 2 kilobyte chunks. Capture must be " - "stopped before requesting the value of this " - "parameter. If the acquisition wrapped during operating " - "in Continuous mode, then the returned value is " - "simply equal to the current capture length.", - ) - """ - Number of kilobytes captured so far in the buffer, rounded-up to 2 kilobyte chunks. - Capture must be stopped before requesting the value of this parameter. - If the acquisition wrapped during operating in Continuous mode, - then the returned value is simply equal to the current capture length. - """ - - self.X: SR86xBufferReadout = self.add_parameter( - "X", parameter_class=SR86xBufferReadout - ) - """ - X buffer readout. - """ - self.Y: SR86xBufferReadout = self.add_parameter( - "Y", parameter_class=SR86xBufferReadout - ) - """ - Y buffer readout. - """ - self.R: SR86xBufferReadout = self.add_parameter( - "R", parameter_class=SR86xBufferReadout - ) - """ - R buffer readout. - """ - self.T: SR86xBufferReadout = self.add_parameter( - "T", parameter_class=SR86xBufferReadout - ) - """ - T buffer readout. - """ - - def snapshot_base( - self, - update: bool | None = False, - params_to_skip_update: Sequence[str] | None = None, - ) -> dict[Any, Any]: - if params_to_skip_update is None: - params_to_skip_update = [] - # we omit count_capture_kilobytes from the snapshot because - # it can only be read after a completed capture and will - # timeout otherwise when the snapshot is updated, e.g. at - # station creation time - params_to_skip_update = list(params_to_skip_update) - params_to_skip_update.append("count_capture_kilobytes") - - snapshot = super().snapshot_base(update, params_to_skip_update) - return snapshot - - def _set_capture_len_parser(self, capture_length_in_kb: int) -> int: - """ - Parse the capture length in kB according to the way buffer treats it - (refer to the manual for details). The given value has to fit in the - range and has to be even, otherwise this function raises exceptions. - - Args: - capture_length_in_kb: The desired capture length in kB. - - Returns: - capture_length_in_kb - - """ - if capture_length_in_kb % 2: - raise ValueError("The capture length should be an even number") - - if ( - not self.min_capture_length_in_kb - <= capture_length_in_kb - <= self.max_capture_length_in_kb - ): - raise ValueError( - f"The capture length should be between " - f"{self.min_capture_length_in_kb} and " - f"{self.max_capture_length_in_kb}" - ) - - return capture_length_in_kb - - def set_capture_rate_to_maximum(self) -> None: - """ - Sets the capture rate to maximum. The maximum capture rate is - retrieved from the device, and depends on the current value of the - time constant. - """ - self.capture_rate(self.capture_rate_max()) - - def _set_capture_rate_parser(self, capture_rate_hz: float) -> int: - """ - According to the manual, the capture rate query returns a value in - Hz, but then setting this value it is expected to give a value n, - where the capture rate in Hz is given by - capture_rate_hz = max_rate / 2 ** n. Please see page 136 of the - manual. Here n is an integer in the range [0, 20]. - - Args: - capture_rate_hz: The desired capture rate in Hz. If the desired - rate is more than 1 Hz from the nearest valid rate, a warning - is issued and the nearest valid rate it used. - - Returns: - n_round - - """ - max_rate = self.capture_rate_max() - n = np.log2(max_rate / capture_rate_hz) - n_round = round(n) - - if not 0 <= n_round <= 20: - raise ValueError( - f"The chosen frequency is invalid. Please " - f"consult the SR860 manual at page 136. " - f"The maximum capture rate is {max_rate}" - ) - - nearest_valid_rate = max_rate / 2**n_round - if abs(capture_rate_hz - nearest_valid_rate) > 1: - available_frequencies = ", ".join( - str(f) for f in self.available_frequencies - ) - log.warning(f"Warning: Setting capture rate to {nearest_valid_rate:.5} Hz") - log.warning(f"The available frequencies are: {available_frequencies}") - - return n_round - - def start_capture(self, acquisition_mode: str, trigger_mode: str) -> None: - """ - Start an acquisition. Please see page 137 of the manual for a detailed - explanation. - - Args: - acquisition_mode: "ONE" | "CONT" - trigger_mode: "IMM" | "TRIG" | "SAMP" - - """ - - if acquisition_mode not in ["ONE", "CONT"]: - raise ValueError("The acquisition mode needs to be either 'ONE' or 'CONT'") - - if trigger_mode not in ["IMM", "TRIG", "SAMP"]: - raise ValueError( - "The trigger mode needs to be either 'IMM', 'TRIG' or 'SAMP'" - ) - - cmd_str = f"CAPTURESTART {acquisition_mode}, {trigger_mode}" - self.write(cmd_str) - - def stop_capture(self) -> None: - """Stop a capture""" - self.write("CAPTURESTOP") - - def _get_list_of_capture_variable_names(self) -> list[str]: - """ - Retrieve the list of names of variables (readouts) that are - set to be captured - """ - return self.capture_config().split(",") - - def _get_number_of_capture_variables(self) -> int: - """ - Retrieve the number of variables (readouts) that are - set to be captured - """ - capture_variables = self._get_list_of_capture_variable_names() - n_variables = len(capture_variables) - return n_variables - - def _calc_capture_size_in_kb(self, sample_count: int) -> int: - """ - Given the number of samples to capture, calculate the capture length - that the buffer needs to be set to in order to fit the requested - number of samples. Note that the number of activated readouts is - taken into account. - """ - n_variables = self._get_number_of_capture_variables() - total_size_in_kb = int( - np.ceil(n_variables * sample_count * self.bytes_per_sample / 1024) - ) - # Make sure that the total size in kb is an even number, as expected by - # the instrument - if total_size_in_kb % 2: - total_size_in_kb += 1 - return total_size_in_kb - - def set_capture_length_to_fit_samples(self, sample_count: int) -> None: - """ - Set the capture length of the buffer to fit the given number of - samples. - - Args: - sample_count: Number of samples that the buffer has to fit - - """ - total_size_in_kb = self._calc_capture_size_in_kb(sample_count) - self.capture_length_in_kb(total_size_in_kb) - - def wait_until_samples_captured(self, sample_count: int) -> None: - """ - Wait until the given number of samples is captured. This function - is blocking and has to be used with caution because it does not have - a timeout. - - Args: - sample_count: Number of samples that needs to be captured - - """ - n_captured_bytes = 0 - n_variables = self._get_number_of_capture_variables() - n_bytes_to_capture = sample_count * n_variables * self.bytes_per_sample - while n_captured_bytes < n_bytes_to_capture: - n_captured_bytes = self.count_capture_bytes() - - def get_capture_data(self, sample_count: int) -> dict[str, npt.NDArray]: - """ - Read the given number of samples of the capture data from the buffer. - - Args: - sample_count: number of samples to read from the buffer - - Returns: - The keys in the dictionary correspond to the captured - variables. For instance, if before the capture, the capture - config was set as 'capture_config("X,Y")', then the keys will - be "X" and "Y". The values in the dictionary are numpy arrays - of numbers. - - """ - total_size_in_kb = self._calc_capture_size_in_kb(sample_count) - capture_variables = self._get_list_of_capture_variable_names() - n_variables = self._get_number_of_capture_variables() - - values = self._get_raw_capture_data(total_size_in_kb) - - # Remove zeros which mark the end part of the buffer that is not - # filled with captured data - values = values[values != 0] - - values = values.reshape((-1, n_variables)).T - values = values[:, :sample_count] - - data = {k: v for k, v in zip(capture_variables, values)} - - for capture_variable in capture_variables: - buffer_parameter = getattr(self, capture_variable) - buffer_parameter.prepare_readout(data[capture_variable]) - - return data - - def _get_raw_capture_data(self, size_in_kb: int) -> npt.NDArray: - """ - Read data from the buffer from its beginning avoiding the instrument - limit of 64 kilobytes per reading. - - Args: - size_in_kb :Size of the data that needs to be read; if it exceeds - the capture length, an exception is raised. - - Returns: - A one-dimensional numpy array of the requested data. Note that the - returned array contains data for all the variables that are - mentioned in the capture config. - - """ - current_capture_length = self.capture_length_in_kb() - if size_in_kb > current_capture_length: - raise ValueError( - f"The size of the requested data ({size_in_kb}kB) " - f"is larger than current capture length of the " - f"buffer ({current_capture_length}kB)." - ) - - values: npt.NDArray = np.array([]) - data_size_to_read_in_kb = size_in_kb - n_readings = 0 - - while data_size_to_read_in_kb > 0: - offset = n_readings * self.max_size_per_reading_in_kb - - if data_size_to_read_in_kb > self.max_size_per_reading_in_kb: - size_of_this_reading = self.max_size_per_reading_in_kb - else: - size_of_this_reading = data_size_to_read_in_kb - - data_from_this_reading = self._get_raw_capture_data_block( - size_of_this_reading, offset_in_kb=offset - ) - values = np.append(values, data_from_this_reading) - - data_size_to_read_in_kb -= size_of_this_reading - n_readings += 1 - - return values - - def _get_raw_capture_data_block( - self, size_in_kb: int, offset_in_kb: int = 0 - ) -> npt.NDArray: - """ - Read data from the buffer. The maximum amount of data that can be - read with this function (size_in_kb) is 64kB (this limitation comes - from the instrument). The offset argument can be used to navigate - along the buffer. - - An exception will be raised if either size_in_kb or offset_in_kb are - longer that the *current* capture length (number of kB of data that is - captured so far rounded up to 2kB chunks). If (offset_in_kb + - size_in_kb) is longer than the *current* capture length, - the instrument returns the wrapped data. - - For more information, refer to the description of the "CAPTUREGET" - command in the manual. - - Args: - size_in_kb: Amount of data in kB that is to be read from the buffer - offset_in_kb: Offset within the buffer of where to read the data; - for example, when 0 is specified, the data is read from the - start of the buffer. - - Returns: - A one-dimensional numpy array of the requested data. Note that the - returned array contains data for all the variables that are - mentioned in the capture config. - - """ - if size_in_kb > self.max_size_per_reading_in_kb: - raise ValueError( - f"The size of the requested data ({size_in_kb}kB) " - f"is larger than maximum size that can be read " - f"at once ({self.max_size_per_reading_in_kb}kB)." - ) - - # Calculate the size of the data captured so far, in kB, rounded up - # to 2kB chunks - size_of_currently_captured_data = int( - np.ceil(np.ceil(self.count_capture_bytes() / 1024) / 2) * 2 - ) - - if size_in_kb > size_of_currently_captured_data: - raise ValueError( - f"The size of the requested data ({size_in_kb}kB) " - f"cannot be larger than the size of currently " - f"captured data rounded up to 2kB chunks " - f"({size_of_currently_captured_data}kB)" - ) - - if offset_in_kb > size_of_currently_captured_data: - raise ValueError( - f"The offset for reading the requested data " - f"({offset_in_kb}kB) cannot be larger than the " - f"size of currently captured data rounded up to " - f"2kB chunks " - f"({size_of_currently_captured_data}kB)" - ) - - values = self._parent.visa_handle.query_binary_values( - f"CAPTUREGET? {offset_in_kb}, {size_in_kb}", - datatype="f", - is_big_endian=False, - expect_termination=False, - ) - # the sr86x does not include an extra termination char on binary - # messages so we set expect_termination to False - - return np.array(values) - - def capture_one_sample_per_trigger( - self, trigger_count: int, start_triggers_pulsetrain: Callable[..., Any] - ) -> dict[str, npt.NDArray]: - """ - Capture one sample per each trigger, and return when the specified - number of triggers has been received. - - Args: - trigger_count: Number of triggers to capture samples for - start_triggers_pulsetrain: By calling this *non-blocking* - function, the train of trigger pulses should start - - Returns: - The keys in the dictionary correspond to the captured - variables. For instance, if before the capture, the capture - config was set as 'capture_config("X,Y")', then the keys will - be "X" and "Y". The values in the dictionary are numpy arrays - of numbers. - - """ - self.set_capture_length_to_fit_samples(trigger_count) - self.start_capture("ONE", "SAMP") - start_triggers_pulsetrain() - self.wait_until_samples_captured(trigger_count) - self.stop_capture() - return self.get_capture_data(trigger_count) - - def capture_samples_after_trigger( - self, sample_count: int, send_trigger: Callable[..., Any] - ) -> dict[str, npt.NDArray]: - """ - Capture a number of samples after a trigger has been received. - Please refer to page 135 of the manual for details. - - Args: - sample_count: Number of samples to capture - send_trigger: By calling this *non-blocking* function, one trigger - should be sent that will initiate the capture - - Returns: - The keys in the dictionary correspond to the captured - variables. For instance, if before the capture, the capture - config was set as 'capture_config("X,Y")', then the keys will - be "X" and "Y". The values in the dictionary are numpy arrays - of numbers. - - """ - self.set_capture_length_to_fit_samples(sample_count) - self.start_capture("ONE", "TRIG") - send_trigger() - self.wait_until_samples_captured(sample_count) - self.stop_capture() - return self.get_capture_data(sample_count) - - def capture_samples(self, sample_count: int) -> dict[str, npt.NDArray]: - """ - Capture a number of samples at a capture rate, starting immediately. - Unlike the "continuous" capture mode, here the buffer does not get - overwritten with the new data once the buffer is full. - - The function blocks until the required number of samples is acquired, - and returns them. - - Args: - sample_count: Number of samples to capture - - Returns: - The keys in the dictionary correspond to the captured - variables. For instance, if before the capture, the capture - config was set as 'capture_config("X,Y")', then the keys will - be "X" and "Y". The values in the dictionary are numpy arrays - of numbers. - - """ - self.set_capture_length_to_fit_samples(sample_count) - self.start_capture("ONE", "IMM") - self.wait_until_samples_captured(sample_count) - self.stop_capture() - return self.get_capture_data(sample_count) - - -class SR86xDataChannel(InstrumentChannel): - """ - Implements a data channel of SR86x lock-in amplifier. Parameters that are - assigned to these channels get plotted on the display of the instrument. - Moreover, there are commands that allow to conveniently retrieve the values - of the parameters that are currently assigned to the data channels. - - This class relies on the available parameter names that should be - mentioned in the lock-in amplifier class in `PARAMETER_NAMES` attribute. - - Args: - parent: an instance of SR86x driver - name: data channel name that is to be used to reference it from the - parent - cmd_id: this ID is used in VISA commands to refer to this data channel, - usually is an integer number - cmd_id_name: this name can also be used in VISA commands along with - channel_id; it is not used in this implementation, but is added - for reference - color: every data channel is also referred to by the color with which it - is being plotted on the instrument's screen; added here only for - reference - - """ - - def __init__( - self, - parent: SR86x, - name: str, - cmd_id: str, - cmd_id_name: str | None = None, - color: str | None = None, - **kwargs: Unpack[InstrumentBaseKWArgs], - ) -> None: - super().__init__(parent, name, **kwargs) - - self._cmd_id = cmd_id - self._cmd_id_name = cmd_id_name - self._color = color - - self.assigned_parameter: Parameter = self.add_parameter( - "assigned_parameter", - label=f"Data channel {cmd_id} parameter", - docstring=f"Allows to set and get the " - f"parameter that is assigned to data " - f"channel {cmd_id}", - set_cmd=f"CDSP {cmd_id}, {{}}", - get_cmd=f"CDSP? {cmd_id}", - val_mapping=self.parent.PARAMETER_NAMES, - ) - """Allows to set and get the parameter that is assigned to the channel""" - - @property - def cmd_id(self) -> str: - return self._cmd_id - - @property - def cmd_id_name(self) -> str | None: - return self._cmd_id_name - - @property - def color(self) -> str | None: - return self._color - - class SR86x(VisaInstrument): """ Base class for Stanford SR86x Lock-in Amplifier drivers. This class should not @@ -1132,39 +467,92 @@ def __init__( """Parameter R_expand""" # Aux input/output - for i in [0, 1, 2, 3]: - self.add_parameter( - f"aux_in{i}", - label=f"Aux input {i}", - get_cmd=f"OAUX? {i}", - get_parser=float, - unit="V", - ) - self.add_parameter( - f"aux_out{i}", - label=f"Aux output {i}", - get_cmd=f"AUXV? {i}", - get_parser=float, - set_cmd=f"AUXV {i}, {{}}", - unit="V", - ) + self.aux_in0 = self.add_parameter( + "aux_in0", + label="Aux input 0", + get_cmd="OAUX? 0", + get_parser=float, + unit="V", + ) + self.aux_out0 = self.add_parameter( + "aux_out0", + label="Aux output 0", + get_cmd="AUXV? 0", + get_parser=float, + set_cmd="AUXV 0, {}", + unit="V", + ) + self.aux_in1 = self.add_parameter( + "aux_in1", + label="Aux input 1", + get_cmd="OAUX? 1", + get_parser=float, + unit="V", + ) + self.aux_out1 = self.add_parameter( + "aux_out1", + label="Aux output 1", + get_cmd="AUXV? 1", + get_parser=float, + set_cmd="AUXV 1, {}", + unit="V", + ) + self.aux_in2 = self.add_parameter( + "aux_in2", + label="Aux input 2", + get_cmd="OAUX? 2", + get_parser=float, + unit="V", + ) + self.aux_out2 = self.add_parameter( + "aux_out2", + label="Aux output 2", + get_cmd="AUXV? 2", + get_parser=float, + set_cmd="AUXV 2, {}", + unit="V", + ) + self.aux_in3 = self.add_parameter( + "aux_in3", + label="Aux input 3", + get_cmd="OAUX? 3", + get_parser=float, + unit="V", + ) + self.aux_out3 = self.add_parameter( + "aux_out3", + label="Aux output 3", + get_cmd="AUXV? 3", + get_parser=float, + set_cmd="AUXV 3, {}", + unit="V", + ) # Data channels: # 'DAT1' (green), 'DAT2' (blue), 'DAT3' (yellow), 'DAT4' (orange) data_channels = ChannelList( self, "data_channels", SR86xDataChannel, snapshotable=False ) - for num, color in zip( - range(self._N_DATA_CHANNELS), ("green", "blue", "yellow", "orange") - ): - cmd_id = f"{num}" - cmd_id_name = f"DAT{num + 1}" - ch_name = f"data_channel_{num + 1}" - data_channel = SR86xDataChannel(self, ch_name, cmd_id, cmd_id_name, color) + data_channel = SR86xDataChannel(self, "data_channel_1", "0", "DAT1", "green") + + data_channels.append(data_channel) + self.data_channel_1 = self.add_submodule("data_channel_1", data_channel) + + data_channel = SR86xDataChannel(self, "data_channel_2", "1", "DAT2", "blue") + + data_channels.append(data_channel) + self.data_channel_2 = self.add_submodule("data_channel_2", data_channel) + + data_channel = SR86xDataChannel(self, "data_channel_3", "2", "DAT3", "yellow") + + data_channels.append(data_channel) + self.data_channel_3 = self.add_submodule("data_channel_3", data_channel) + + data_channel = SR86xDataChannel(self, "data_channel_4", "3", "DAT4", "orange") - data_channels.append(data_channel) - self.add_submodule(ch_name, data_channel) + data_channels.append(data_channel) + self.data_channel_4 = self.add_submodule("data_channel_4", data_channel) self.data_channels: ChannelTuple[SR86xDataChannel] = self.add_submodule( "data_channels", data_channels.to_channel_tuple() diff --git a/src/qcodes/instrument_drivers/stanford_research/_SR86x_modules.py b/src/qcodes/instrument_drivers/stanford_research/_SR86x_modules.py new file mode 100644 index 000000000000..ccf1358d83c9 --- /dev/null +++ b/src/qcodes/instrument_drivers/stanford_research/_SR86x_modules.py @@ -0,0 +1,665 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +import numpy as np +import numpy.typing as npt + +from qcodes.instrument import ( + InstrumentBaseKWArgs, + InstrumentChannel, +) +from qcodes.parameters import ArrayParameter + +if TYPE_CHECKING: + from collections.abc import Callable, Sequence + + from typing_extensions import Unpack + + from qcodes.parameters import Parameter + + from .SR86x import SR86x + +log = logging.getLogger(__name__) + + +class SR86xBufferReadout(ArrayParameter): + """ + The parameter array that holds read out data. We need this to be compatible + with qcodes.Measure + + Args: + name: Name of the parameter. + instrument: The instrument to add this parameter to. + + """ + + def __init__(self, name: str, instrument: SR86x, **kwargs: Any) -> None: + unit = "deg" + if name in ["X", "Y", "R"]: + unit = "V" + + super().__init__( + name, + shape=(1,), # dummy initial shape + unit=unit, + setpoint_names=("Time",), + setpoint_labels=("Time",), + setpoint_units=("s",), + instrument=instrument, + docstring="Holds an acquired (part of the) data buffer of one channel.", + **kwargs, + ) + + self._capture_data: npt.NDArray | None = None + + def prepare_readout(self, capture_data: npt.NDArray) -> None: + """ + Prepare this parameter for readout. + + Args: + capture_data: The data to capture. + + """ + self._capture_data = capture_data + + data_len = len(capture_data) + self.shape = (data_len,) + self.setpoint_units = ("",) + self.setpoint_names = ("sample_nr",) + self.setpoint_labels = ("Sample number",) + self.setpoints = (tuple(np.arange(0, data_len)),) + + def get_raw(self) -> npt.NDArray: + """ + Public method to access the capture data + """ + if self._capture_data is None: + raise ValueError( + f"Cannot return data for parameter {self.name}. " + f"Please prepare for readout by calling " + f"'get_capture_data' with appropriate " + f"configuration settings" + ) + + return self._capture_data + + +class SR86xBuffer(InstrumentChannel): + """ + Buffer module for the SR86x drivers. + + This driver has been verified to work with the SR860 and SR865. + For reference, please consult the SR860 + manual: http://thinksrs.com/downloads/PDFs/Manuals/SR860m.pdf + """ + + def __init__( + self, parent: SR86x, name: str, **kwargs: Unpack[InstrumentBaseKWArgs] + ) -> None: + super().__init__(parent, name, **kwargs) + + self.capture_length_in_kb: Parameter = self.add_parameter( + "capture_length_in_kb", + label="get/set capture length", + get_cmd="CAPTURELEN?", + set_cmd="CAPTURELEN {}", + set_parser=self._set_capture_len_parser, + get_parser=int, + unit="kB", + ) + """Parameter capture_length_in_kb""" + self.bytes_per_sample = 4 + self.min_capture_length_in_kb = 1 # i.e. minimum buffer size + self.max_capture_length_in_kb = 4096 # i.e. maximum buffer size + # Maximum amount of kB that can be read per single CAPTUREGET command + self.max_size_per_reading_in_kb = 64 + + self.capture_config: Parameter = ( + self.add_parameter( # Configure which parameters we want to capture + "capture_config", + label="capture configuration", + get_cmd="CAPTURECFG?", + set_cmd="CAPTURECFG {}", + val_mapping={"X": "0", "X,Y": "1", "R,T": "2", "X,Y,R,T": "3"}, + ) + ) + """Parameter capture_config""" + + self.capture_rate_max: Parameter = self.add_parameter( + "capture_rate_max", + label="capture rate maximum", + get_cmd="CAPTURERATEMAX?", + get_parser=float, + ) + """Parameter capture_rate_max""" + + self.capture_rate: Parameter = self.add_parameter( + "capture_rate", + label="capture rate raw", + get_cmd="CAPTURERATE?", + set_cmd="CAPTURERATE {}", + get_parser=float, + set_parser=self._set_capture_rate_parser, + ) + """Parameter capture_rate""" + + max_rate = self.capture_rate_max() + self.available_frequencies = [max_rate / 2**i for i in range(20)] + + self.capture_status: Parameter = ( + self.add_parameter( # Are we capturing at the moment? + "capture_status", label="capture status", get_cmd="CAPTURESTAT?" + ) + ) + """Parameter capture_status""" + + self.count_capture_bytes: Parameter = self.add_parameter( + "count_capture_bytes", + label="captured bytes", + get_cmd="CAPTUREBYTES?", + unit="B", + get_parser=int, + docstring="Number of bytes captured so far in the buffer. Can be " + "used to track live progress.", + ) + """Number of bytes captured so far in the buffer. Can be used to track live progress.""" + + self.count_capture_kilobytes: Parameter = self.add_parameter( + "count_capture_kilobytes", + label="captured kilobytes", + get_cmd="CAPTUREPROG?", + unit="kB", + docstring="Number of kilobytes captured so far in the buffer, " + "rounded-up to 2 kilobyte chunks. Capture must be " + "stopped before requesting the value of this " + "parameter. If the acquisition wrapped during operating " + "in Continuous mode, then the returned value is " + "simply equal to the current capture length.", + ) + """ + Number of kilobytes captured so far in the buffer, rounded-up to 2 kilobyte chunks. + Capture must be stopped before requesting the value of this parameter. + If the acquisition wrapped during operating in Continuous mode, + then the returned value is simply equal to the current capture length. + """ + + for parameter_name in ["X", "Y", "R", "T"]: + self.add_parameter(parameter_name, parameter_class=SR86xBufferReadout) + + def snapshot_base( + self, + update: bool | None = False, + params_to_skip_update: Sequence[str] | None = None, + ) -> dict[Any, Any]: + if params_to_skip_update is None: + params_to_skip_update = [] + # we omit count_capture_kilobytes from the snapshot because + # it can only be read after a completed capture and will + # timeout otherwise when the snapshot is updated, e.g. at + # station creation time + params_to_skip_update = list(params_to_skip_update) + params_to_skip_update.append("count_capture_kilobytes") + + snapshot = super().snapshot_base(update, params_to_skip_update) + return snapshot + + def _set_capture_len_parser(self, capture_length_in_kb: int) -> int: + """ + Parse the capture length in kB according to the way buffer treats it + (refer to the manual for details). The given value has to fit in the + range and has to be even, otherwise this function raises exceptions. + + Args: + capture_length_in_kb: The desired capture length in kB. + + Returns: + capture_length_in_kb + + """ + if capture_length_in_kb % 2: + raise ValueError("The capture length should be an even number") + + if ( + not self.min_capture_length_in_kb + <= capture_length_in_kb + <= self.max_capture_length_in_kb + ): + raise ValueError( + f"The capture length should be between " + f"{self.min_capture_length_in_kb} and " + f"{self.max_capture_length_in_kb}" + ) + + return capture_length_in_kb + + def set_capture_rate_to_maximum(self) -> None: + """ + Sets the capture rate to maximum. The maximum capture rate is + retrieved from the device, and depends on the current value of the + time constant. + """ + self.capture_rate(self.capture_rate_max()) + + def _set_capture_rate_parser(self, capture_rate_hz: float) -> int: + """ + According to the manual, the capture rate query returns a value in + Hz, but then setting this value it is expected to give a value n, + where the capture rate in Hz is given by + capture_rate_hz = max_rate / 2 ** n. Please see page 136 of the + manual. Here n is an integer in the range [0, 20]. + + Args: + capture_rate_hz: The desired capture rate in Hz. If the desired + rate is more than 1 Hz from the nearest valid rate, a warning + is issued and the nearest valid rate it used. + + Returns: + n_round + + """ + max_rate = self.capture_rate_max() + n = np.log2(max_rate / capture_rate_hz) + n_round = round(n) + + if not 0 <= n_round <= 20: + raise ValueError( + f"The chosen frequency is invalid. Please " + f"consult the SR860 manual at page 136. " + f"The maximum capture rate is {max_rate}" + ) + + nearest_valid_rate = max_rate / 2**n_round + if abs(capture_rate_hz - nearest_valid_rate) > 1: + available_frequencies = ", ".join( + str(f) for f in self.available_frequencies + ) + log.warning(f"Warning: Setting capture rate to {nearest_valid_rate:.5} Hz") + log.warning(f"The available frequencies are: {available_frequencies}") + + return n_round + + def start_capture(self, acquisition_mode: str, trigger_mode: str) -> None: + """ + Start an acquisition. Please see page 137 of the manual for a detailed + explanation. + + Args: + acquisition_mode: "ONE" | "CONT" + trigger_mode: "IMM" | "TRIG" | "SAMP" + + """ + + if acquisition_mode not in ["ONE", "CONT"]: + raise ValueError("The acquisition mode needs to be either 'ONE' or 'CONT'") + + if trigger_mode not in ["IMM", "TRIG", "SAMP"]: + raise ValueError( + "The trigger mode needs to be either 'IMM', 'TRIG' or 'SAMP'" + ) + + cmd_str = f"CAPTURESTART {acquisition_mode}, {trigger_mode}" + self.write(cmd_str) + + def stop_capture(self) -> None: + """Stop a capture""" + self.write("CAPTURESTOP") + + def _get_list_of_capture_variable_names(self) -> list[str]: + """ + Retrieve the list of names of variables (readouts) that are + set to be captured + """ + return self.capture_config().split(",") + + def _get_number_of_capture_variables(self) -> int: + """ + Retrieve the number of variables (readouts) that are + set to be captured + """ + capture_variables = self._get_list_of_capture_variable_names() + n_variables = len(capture_variables) + return n_variables + + def _calc_capture_size_in_kb(self, sample_count: int) -> int: + """ + Given the number of samples to capture, calculate the capture length + that the buffer needs to be set to in order to fit the requested + number of samples. Note that the number of activated readouts is + taken into account. + """ + n_variables = self._get_number_of_capture_variables() + total_size_in_kb = int( + np.ceil(n_variables * sample_count * self.bytes_per_sample / 1024) + ) + # Make sure that the total size in kb is an even number, as expected by + # the instrument + if total_size_in_kb % 2: + total_size_in_kb += 1 + return total_size_in_kb + + def set_capture_length_to_fit_samples(self, sample_count: int) -> None: + """ + Set the capture length of the buffer to fit the given number of + samples. + + Args: + sample_count: Number of samples that the buffer has to fit + + """ + total_size_in_kb = self._calc_capture_size_in_kb(sample_count) + self.capture_length_in_kb(total_size_in_kb) + + def wait_until_samples_captured(self, sample_count: int) -> None: + """ + Wait until the given number of samples is captured. This function + is blocking and has to be used with caution because it does not have + a timeout. + + Args: + sample_count: Number of samples that needs to be captured + + """ + n_captured_bytes = 0 + n_variables = self._get_number_of_capture_variables() + n_bytes_to_capture = sample_count * n_variables * self.bytes_per_sample + while n_captured_bytes < n_bytes_to_capture: + n_captured_bytes = self.count_capture_bytes() + + def get_capture_data(self, sample_count: int) -> dict[str, npt.NDArray]: + """ + Read the given number of samples of the capture data from the buffer. + + Args: + sample_count: number of samples to read from the buffer + + Returns: + The keys in the dictionary correspond to the captured + variables. For instance, if before the capture, the capture + config was set as 'capture_config("X,Y")', then the keys will + be "X" and "Y". The values in the dictionary are numpy arrays + of numbers. + + """ + total_size_in_kb = self._calc_capture_size_in_kb(sample_count) + capture_variables = self._get_list_of_capture_variable_names() + n_variables = self._get_number_of_capture_variables() + + values = self._get_raw_capture_data(total_size_in_kb) + + # Remove zeros which mark the end part of the buffer that is not + # filled with captured data + values = values[values != 0] + + values = values.reshape((-1, n_variables)).T + values = values[:, :sample_count] + + data = {k: v for k, v in zip(capture_variables, values)} + + for capture_variable in capture_variables: + buffer_parameter = getattr(self, capture_variable) + buffer_parameter.prepare_readout(data[capture_variable]) + + return data + + def _get_raw_capture_data(self, size_in_kb: int) -> npt.NDArray: + """ + Read data from the buffer from its beginning avoiding the instrument + limit of 64 kilobytes per reading. + + Args: + size_in_kb :Size of the data that needs to be read; if it exceeds + the capture length, an exception is raised. + + Returns: + A one-dimensional numpy array of the requested data. Note that the + returned array contains data for all the variables that are + mentioned in the capture config. + + """ + current_capture_length = self.capture_length_in_kb() + if size_in_kb > current_capture_length: + raise ValueError( + f"The size of the requested data ({size_in_kb}kB) " + f"is larger than current capture length of the " + f"buffer ({current_capture_length}kB)." + ) + + values: npt.NDArray = np.array([]) + data_size_to_read_in_kb = size_in_kb + n_readings = 0 + + while data_size_to_read_in_kb > 0: + offset = n_readings * self.max_size_per_reading_in_kb + + if data_size_to_read_in_kb > self.max_size_per_reading_in_kb: + size_of_this_reading = self.max_size_per_reading_in_kb + else: + size_of_this_reading = data_size_to_read_in_kb + + data_from_this_reading = self._get_raw_capture_data_block( + size_of_this_reading, offset_in_kb=offset + ) + values = np.append(values, data_from_this_reading) + + data_size_to_read_in_kb -= size_of_this_reading + n_readings += 1 + + return values + + def _get_raw_capture_data_block( + self, size_in_kb: int, offset_in_kb: int = 0 + ) -> npt.NDArray: + """ + Read data from the buffer. The maximum amount of data that can be + read with this function (size_in_kb) is 64kB (this limitation comes + from the instrument). The offset argument can be used to navigate + along the buffer. + + An exception will be raised if either size_in_kb or offset_in_kb are + longer that the *current* capture length (number of kB of data that is + captured so far rounded up to 2kB chunks). If (offset_in_kb + + size_in_kb) is longer than the *current* capture length, + the instrument returns the wrapped data. + + For more information, refer to the description of the "CAPTUREGET" + command in the manual. + + Args: + size_in_kb: Amount of data in kB that is to be read from the buffer + offset_in_kb: Offset within the buffer of where to read the data; + for example, when 0 is specified, the data is read from the + start of the buffer. + + Returns: + A one-dimensional numpy array of the requested data. Note that the + returned array contains data for all the variables that are + mentioned in the capture config. + + """ + if size_in_kb > self.max_size_per_reading_in_kb: + raise ValueError( + f"The size of the requested data ({size_in_kb}kB) " + f"is larger than maximum size that can be read " + f"at once ({self.max_size_per_reading_in_kb}kB)." + ) + + # Calculate the size of the data captured so far, in kB, rounded up + # to 2kB chunks + size_of_currently_captured_data = int( + np.ceil(np.ceil(self.count_capture_bytes() / 1024) / 2) * 2 + ) + + if size_in_kb > size_of_currently_captured_data: + raise ValueError( + f"The size of the requested data ({size_in_kb}kB) " + f"cannot be larger than the size of currently " + f"captured data rounded up to 2kB chunks " + f"({size_of_currently_captured_data}kB)" + ) + + if offset_in_kb > size_of_currently_captured_data: + raise ValueError( + f"The offset for reading the requested data " + f"({offset_in_kb}kB) cannot be larger than the " + f"size of currently captured data rounded up to " + f"2kB chunks " + f"({size_of_currently_captured_data}kB)" + ) + + values = self._parent.visa_handle.query_binary_values( + f"CAPTUREGET? {offset_in_kb}, {size_in_kb}", + datatype="f", + is_big_endian=False, + expect_termination=False, + ) + # the sr86x does not include an extra termination char on binary + # messages so we set expect_termination to False + + return np.array(values) + + def capture_one_sample_per_trigger( + self, trigger_count: int, start_triggers_pulsetrain: Callable[..., Any] + ) -> dict[str, npt.NDArray]: + """ + Capture one sample per each trigger, and return when the specified + number of triggers has been received. + + Args: + trigger_count: Number of triggers to capture samples for + start_triggers_pulsetrain: By calling this *non-blocking* + function, the train of trigger pulses should start + + Returns: + The keys in the dictionary correspond to the captured + variables. For instance, if before the capture, the capture + config was set as 'capture_config("X,Y")', then the keys will + be "X" and "Y". The values in the dictionary are numpy arrays + of numbers. + + """ + self.set_capture_length_to_fit_samples(trigger_count) + self.start_capture("ONE", "SAMP") + start_triggers_pulsetrain() + self.wait_until_samples_captured(trigger_count) + self.stop_capture() + return self.get_capture_data(trigger_count) + + def capture_samples_after_trigger( + self, sample_count: int, send_trigger: Callable[..., Any] + ) -> dict[str, npt.NDArray]: + """ + Capture a number of samples after a trigger has been received. + Please refer to page 135 of the manual for details. + + Args: + sample_count: Number of samples to capture + send_trigger: By calling this *non-blocking* function, one trigger + should be sent that will initiate the capture + + Returns: + The keys in the dictionary correspond to the captured + variables. For instance, if before the capture, the capture + config was set as 'capture_config("X,Y")', then the keys will + be "X" and "Y". The values in the dictionary are numpy arrays + of numbers. + + """ + self.set_capture_length_to_fit_samples(sample_count) + self.start_capture("ONE", "TRIG") + send_trigger() + self.wait_until_samples_captured(sample_count) + self.stop_capture() + return self.get_capture_data(sample_count) + + def capture_samples(self, sample_count: int) -> dict[str, npt.NDArray]: + """ + Capture a number of samples at a capture rate, starting immediately. + Unlike the "continuous" capture mode, here the buffer does not get + overwritten with the new data once the buffer is full. + + The function blocks until the required number of samples is acquired, + and returns them. + + Args: + sample_count: Number of samples to capture + + Returns: + The keys in the dictionary correspond to the captured + variables. For instance, if before the capture, the capture + config was set as 'capture_config("X,Y")', then the keys will + be "X" and "Y". The values in the dictionary are numpy arrays + of numbers. + + """ + self.set_capture_length_to_fit_samples(sample_count) + self.start_capture("ONE", "IMM") + self.wait_until_samples_captured(sample_count) + self.stop_capture() + return self.get_capture_data(sample_count) + + +class SR86xDataChannel(InstrumentChannel): + """ + Implements a data channel of SR86x lock-in amplifier. Parameters that are + assigned to these channels get plotted on the display of the instrument. + Moreover, there are commands that allow to conveniently retrieve the values + of the parameters that are currently assigned to the data channels. + + This class relies on the available parameter names that should be + mentioned in the lock-in amplifier class in `PARAMETER_NAMES` attribute. + + Args: + parent: an instance of SR86x driver + name: data channel name that is to be used to reference it from the + parent + cmd_id: this ID is used in VISA commands to refer to this data channel, + usually is an integer number + cmd_id_name: this name can also be used in VISA commands along with + channel_id; it is not used in this implementation, but is added + for reference + color: every data channel is also referred to by the color with which it + is being plotted on the instrument's screen; added here only for + reference + + """ + + def __init__( + self, + parent: SR86x, + name: str, + cmd_id: str, + cmd_id_name: str | None = None, + color: str | None = None, + **kwargs: Unpack[InstrumentBaseKWArgs], + ) -> None: + super().__init__(parent, name, **kwargs) + + self._cmd_id = cmd_id + self._cmd_id_name = cmd_id_name + self._color = color + + self.assigned_parameter: Parameter = self.add_parameter( + "assigned_parameter", + label=f"Data channel {cmd_id} parameter", + docstring=f"Allows to set and get the " + f"parameter that is assigned to data " + f"channel {cmd_id}", + set_cmd=f"CDSP {cmd_id}, {{}}", + get_cmd=f"CDSP? {cmd_id}", + val_mapping=self.parent.PARAMETER_NAMES, + ) + """Allows to set and get the parameter that is assigned to the channel""" + + @property + def cmd_id(self) -> str: + return self._cmd_id + + @property + def cmd_id_name(self) -> str | None: + return self._cmd_id_name + + @property + def color(self) -> str | None: + return self._color diff --git a/src/qcodes/instrument_drivers/stanford_research/generate_code.py b/src/qcodes/instrument_drivers/stanford_research/generate_code.py new file mode 100644 index 000000000000..655d48686c4b --- /dev/null +++ b/src/qcodes/instrument_drivers/stanford_research/generate_code.py @@ -0,0 +1,14 @@ +from jinja2 import Environment, FileSystemLoader + +env = Environment( + loader=FileSystemLoader("."), + lstrip_blocks=True, + trim_blocks=True, + keep_trailing_newline=True, +) +template = env.get_template("SR86x.jinja") + +output = template.render() + +with open("SR86x.py", "w") as f: + f.write(output) From f0c2d7542188d06b0fa290a1aa92598be09d4364 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Tue, 30 Sep 2025 14:49:31 +0200 Subject: [PATCH 2/6] Add note --- src/qcodes/instrument_drivers/stanford_research/SR86x.jinja | 3 +++ src/qcodes/instrument_drivers/stanford_research/SR86x.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja b/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja index 772a3dbaef57..9f107cc4fe25 100644 --- a/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja +++ b/src/qcodes/instrument_drivers/stanford_research/SR86x.jinja @@ -1,3 +1,6 @@ +# NOTE SR86x.py is generated from a template file (SR86x.jinja). +# To make changes, edit the template file and regenerate. +# Any changes to SR86x.py will be overwritten. from __future__ import annotations import logging diff --git a/src/qcodes/instrument_drivers/stanford_research/SR86x.py b/src/qcodes/instrument_drivers/stanford_research/SR86x.py index c06203087bdf..9e8585488da5 100644 --- a/src/qcodes/instrument_drivers/stanford_research/SR86x.py +++ b/src/qcodes/instrument_drivers/stanford_research/SR86x.py @@ -1,3 +1,6 @@ +# NOTE SR86x.py is generated from a template file (SR86x.jinja). +# To make changes, edit the template file and regenerate. +# Any changes to SR86x.py will be overwritten. from __future__ import annotations import logging From 258e47909c5a092f888835331528db1a3c9c5419 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Tue, 30 Sep 2025 14:54:12 +0200 Subject: [PATCH 3/6] Rename --- .../stanford_research/generate_code.py => _generate_code.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename src/qcodes/instrument_drivers/stanford_research/generate_code.py => _generate_code.py (59%) diff --git a/src/qcodes/instrument_drivers/stanford_research/generate_code.py b/_generate_code.py similarity index 59% rename from src/qcodes/instrument_drivers/stanford_research/generate_code.py rename to _generate_code.py index 655d48686c4b..435d614c375b 100644 --- a/src/qcodes/instrument_drivers/stanford_research/generate_code.py +++ b/_generate_code.py @@ -1,7 +1,7 @@ from jinja2 import Environment, FileSystemLoader env = Environment( - loader=FileSystemLoader("."), + loader=FileSystemLoader("src/qcodes/instrument_drivers/stanford_research"), lstrip_blocks=True, trim_blocks=True, keep_trailing_newline=True, @@ -10,5 +10,5 @@ output = template.render() -with open("SR86x.py", "w") as f: +with open("src/qcodes/instrument_drivers/stanford_research/SR86x.py", "w") as f: f.write(output) From d1846902efa2449921f71f44039ef0a45fba65ad Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 1 Oct 2025 07:55:31 +0200 Subject: [PATCH 4/6] Run generate as a precommit hook --- .pre-commit-config.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 73e6a542c39a..d8e66a2813e5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,3 +29,10 @@ repos: rev: 3.0.0 hooks: - id: shellcheck + - repo: local + hooks: + - id: generate-jinja2-code + name: Generate Jinja2 Code + entry: uv run --extra test _generate_code.py + language: system + files: '\.jinja$' From 27db2c12ee5ec20a0e3a6e2a894e497761e16165 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 1 Oct 2025 08:02:04 +0200 Subject: [PATCH 5/6] Add metadata to generate script --- _generate_code.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/_generate_code.py b/_generate_code.py index 435d614c375b..56f988ed8de3 100644 --- a/_generate_code.py +++ b/_generate_code.py @@ -1,3 +1,10 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "jinja2>=3.1.6", +# ] +# /// + from jinja2 import Environment, FileSystemLoader env = Environment( From 241271a94332055d5b140ec2fff6dde2eb35b2e6 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 1 Oct 2025 08:04:33 +0200 Subject: [PATCH 6/6] Ensure that correct lineendings are used in generated code --- _generate_code.py | 1 + 1 file changed, 1 insertion(+) diff --git a/_generate_code.py b/_generate_code.py index 56f988ed8de3..a78718bf726e 100644 --- a/_generate_code.py +++ b/_generate_code.py @@ -12,6 +12,7 @@ lstrip_blocks=True, trim_blocks=True, keep_trailing_newline=True, + newline_sequence="\n", ) template = env.get_template("SR86x.jinja")