From d2883530316b0e56539945b44ea73796d18f7c53 Mon Sep 17 00:00:00 2001 From: Kyung-hoon-Jung0 Date: Wed, 21 Jan 2026 00:15:17 +0900 Subject: [PATCH] chore: fix the parameter hierachy in Parameter class --- .../24_coupler_flux_long_distortion.py | 288 ++++++++---------- 1 file changed, 127 insertions(+), 161 deletions(-) diff --git a/qualibration_graphs/superconducting/calibrations/1Q_calibrations/24_coupler_flux_long_distortion.py b/qualibration_graphs/superconducting/calibrations/1Q_calibrations/24_coupler_flux_long_distortion.py index 1e5b953fd..2fa9d9a6d 100644 --- a/qualibration_graphs/superconducting/calibrations/1Q_calibrations/24_coupler_flux_long_distortion.py +++ b/qualibration_graphs/superconducting/calibrations/1Q_calibrations/24_coupler_flux_long_distortion.py @@ -5,7 +5,7 @@ import warnings from dataclasses import asdict -from typing import ClassVar, List, Literal +from typing import List, Literal, Optional import matplotlib.pyplot as plt import numpy as np @@ -24,11 +24,7 @@ from qualibrate import NodeParameters, QualibrationNode from qualibration_libs.core import tracked_updates from qualibration_libs.data import XarrayDataFetcher -from qualibration_libs.parameters import ( - CommonNodeParameters, - QubitPairExperimentNodeParameters, - get_qubit_pairs, -) +from qualibration_libs.parameters import CommonNodeParameters from qualibration_libs.runtime import simulate_and_plot from quam_config import Quam @@ -36,12 +32,13 @@ class Parameters( NodeParameters, CommonNodeParameters, - QubitPairExperimentNodeParameters, ): """Parameters for pi vs coupler flux calibration node.""" - targets_name: ClassVar[str] = "qubit_pairs" - + qubits: Optional[List[str]] = None + """List of qubit names to calibrate. If None or empty, all active qubits are used.""" + coupler: Optional[str] = None + """Name of the coupler to use for flux pulsing (e.g., 'coupler_q1_q2').""" num_shots: int = 30 """Number of shots to acquire.""" operation: str = "x180" @@ -70,8 +67,6 @@ class Parameters( """Whether to update the state. CAUTION: assumes fitting will be acceptable""" update_state_from_GUI: bool = False """Whether to update the state from the GUI, select when fitting is successful""" - measure_qubit: Literal["control", "target"] = "target" - """Which qubit to measure from the pair: 'control' or 'target'. Default is 'target'.""" coupler_flux_amplitude: float = 0.1 """Amplitude of the coupler flux pulse in Volts. Default is 0.1V.""" @@ -129,27 +124,28 @@ def custom_param(node: QualibrationNode[Parameters, Quam]): def create_qua_program(node: QualibrationNode[Parameters, Quam]): """Create the sweep axes and generate the QUA program for pi vs coupler flux measurement.""" u = unit(coerce_to_integer=True) - # Get qubit pairs and extract measured qubits - node.namespace["qubit_pairs"] = qubit_pairs = get_qubit_pairs(node) - num_qubit_pairs = len(qubit_pairs) - - # Extract the measured qubits based on measure_qubit parameter - measured_qubits = [] - for qp in qubit_pairs: - if node.parameters.measure_qubit == "control": - measured_qubits.append(qp.qubit_control) - else: - measured_qubits.append(qp.qubit_target) - node.namespace["measured_qubits"] = measured_qubits - # Also set in qubits for compatibility with analysis functions - node.namespace["qubits"] = measured_qubits + machine = node.machine + + # Get the qubits from parameters or use active qubits + if not node.parameters.qubits: + qubits = machine.active_qubits + else: + qubits = [machine.qubits[q] for q in node.parameters.qubits] + + assert len(qubits) == 1, "This node only supports one qubit at a time." + qubit = qubits[0] + + # Get the coupler from parameters + assert node.parameters.coupler is not None, "Coupler parameter must be specified." + coupler = machine.qubit_pairs[node.parameters.coupler].coupler + + node.namespace["qubits"] = [qubit] + node.namespace["coupler"] = coupler operation_name = node.parameters.operation # Ensure operation exists and default to x180 if not - for qubit in measured_qubits: - if hasattr(qubit.xy.operations, operation_name): - continue + if not hasattr(qubit.xy.operations, operation_name): warnings.warn(f"Qubit {qubit.name} has no operation '{operation_name}', defaulting to 'x180'") operation_name = "x180" @@ -180,44 +176,41 @@ def create_qua_program(node: QualibrationNode[Parameters, Quam]): # Coupler flux amplitude from parameter coupler_flux_amp = node.parameters.coupler_flux_amplitude - # Sweep axes for data fetcher - use qubit_pair dimension node.namespace["sweep_axes"] = { - "qubit_pair": xr.DataArray(qubit_pairs.get_names()), + "qubit": xr.DataArray([qubit.name]), "detuning": xr.DataArray(dfs, attrs={"long_name": "qubit frequency", "units": "Hz"}), "time": xr.DataArray(4 * times, attrs={"long_name": "Flux pulse duration", "units": "ns"}), } # Track LO updates tracked_qubits = [] - if_update = [] - - for q in measured_qubits: - # Decide if updating the LO is needed depending on the detuning request - if ( - q.xy.intermediate_frequency - - node.parameters.detuning_in_mhz * 1e6 - - node.parameters.frequency_span_in_mhz * 1e6 / 2 - ) < -400e6: - node.parameters.reset_type = "thermal" # Active reset will not work if the lo is changed - warnings.warn( - "Qubit LO has been changed to reach desired detuning, " - "active reset will not work. Reset type changed to thermal." - ) - if_update.append(0) - # track the LO and IF changes to revert later - with tracked_updates(q, auto_revert=False, dont_assign_to_none=False) as q_upd: - rf_frequency = q_upd.xy.intermediate_frequency + q_upd.xy.opx_output.upconverter_frequency - lo_frequency = q_upd.xy.opx_output.upconverter_frequency - node.parameters.detuning_in_mhz * 1e6 - if (q_upd.xy.opx_output.band == 3) and (lo_frequency < 6.5e9): - raise ValueError("Requested detuning is too large for the given MW FEM band") - if (q_upd.xy.opx_output.band == 2) and (lo_frequency < 4.5e9): - raise ValueError("Requested detuning is too large for the given MW FEM band") - print(f"Updating {q_upd.name} LO to {lo_frequency}") - q_upd.xy.opx_output.upconverter_frequency = lo_frequency - q_upd.xy.RF_frequency -= node.parameters.detuning_in_mhz * 1e6 - tracked_qubits.append(q_upd) - else: - if_update.append(int(node.parameters.detuning_in_mhz)) + if_update = 0 + + # Decide if updating the LO is needed depending on the detuning request + if ( + qubit.xy.intermediate_frequency + - node.parameters.detuning_in_mhz * 1e6 + - node.parameters.frequency_span_in_mhz * 1e6 / 2 + ) < -400e6: + node.parameters.reset_type = "thermal" # Active reset will not work if the lo is changed + warnings.warn( + "Qubit LO has been changed to reach desired detuning, " + "active reset will not work. Reset type changed to thermal." + ) + if_update = 0 + # track the LO and IF changes to revert later + with tracked_updates(qubit, auto_revert=False, dont_assign_to_none=False) as q_upd: + lo_frequency = q_upd.xy.opx_output.upconverter_frequency - node.parameters.detuning_in_mhz * 1e6 + if (q_upd.xy.opx_output.band == 3) and (lo_frequency < 6.5e9): + raise ValueError("Requested detuning is too large for the given MW FEM band") + if (q_upd.xy.opx_output.band == 2) and (lo_frequency < 4.5e9): + raise ValueError("Requested detuning is too large for the given MW FEM band") + print(f"Updating {q_upd.name} LO to {lo_frequency}") + q_upd.xy.opx_output.upconverter_frequency = lo_frequency + q_upd.xy.RF_frequency -= node.parameters.detuning_in_mhz * 1e6 + tracked_qubits.append(q_upd) + else: + if_update = int(node.parameters.detuning_in_mhz) node.namespace["if_update"] = if_update node.namespace["tracked_qubits"] = tracked_qubits @@ -225,83 +218,60 @@ def create_qua_program(node: QualibrationNode[Parameters, Quam]): with program() as qua_prog: I, I_st, Q, Q_st, n, n_st = node.machine.declare_qua_variables() if node.parameters.use_state_discrimination: - state = [declare(int) for _ in range(num_qubit_pairs)] - state_st = [declare_stream() for _ in range(num_qubit_pairs)] + state = [declare(int) for _ in range(1)] + state_st = [declare_stream() for _ in range(1)] df = declare(int) t_delay = declare(int) - for multiplexed_qubit_pairs in qubit_pairs.batch(): - # Place qubits to their respective flux point (initialize both qubits in pair) - for qp in multiplexed_qubit_pairs.values(): - node.machine.initialize_qpu(target=qp.qubit_control) - node.machine.initialize_qpu(target=qp.qubit_target) - align() - # Averaging loop - with for_(n, 0, n < node.parameters.num_shots, n + 1): - save(n, n_st) - # Qubit spectroscopy frequency loop - with for_(*from_array(df, dfs)): - # Time delay loop - with for_each_(t_delay, times): - # Reset the measured qubits - for i, qp in multiplexed_qubit_pairs.items(): - # Select the measured qubit based on measure_qubit parameter - if node.parameters.measure_qubit == "control": - qubit = qp.qubit_control - else: - qubit = qp.qubit_target - qubit.reset(node.parameters.reset_type, node.parameters.simulate) - align() - - for i, qp in multiplexed_qubit_pairs.items(): - # Select the measured qubit based on measure_qubit parameter - if node.parameters.measure_qubit == "control": - qubit = qp.qubit_control - else: - qubit = qp.qubit_target - - # Step the qubit spectroscopy tone frequency - qubit.xy.update_frequency(df + qubit.xy.intermediate_frequency - if_update[i]) - qubit.align() - # Play the coupler flux pulse - qp.coupler.play( - "const", - amplitude_scale=coupler_flux_amp / qp.coupler.operations["const"].amplitude, - duration=t_delay + 200, - ) - # Wait for a variable time - qubit.xy.wait(t_delay) - # Play the qubit spectroscopy pulse - qubit.xy.play(operation_name, amplitude_scale=operation_amp_scale) - qubit.xy.update_frequency(qubit.xy.intermediate_frequency) - qubit.align() - qubit.wait(200) - - for i, qp in multiplexed_qubit_pairs.items(): - # Select the measured qubit based on measure_qubit parameter - if node.parameters.measure_qubit == "control": - qubit = qp.qubit_control - else: - qubit = qp.qubit_target - - if node.parameters.use_state_discrimination: - qubit.readout_state(state[i]) - save(state[i], state_st[i]) - else: - qubit.resonator.measure("readout", qua_vars=(I[i], Q[i])) - save(I[i], I_st[i]) - save(Q[i], Q_st[i]) - align() + # Initialize the QPU for the qubit + node.machine.initialize_qpu(target=qubit) + align() + + # Averaging loop + with for_(n, 0, n < node.parameters.num_shots, n + 1): + save(n, n_st) + # Qubit spectroscopy frequency loop + with for_(*from_array(df, dfs)): + # Time delay loop + with for_each_(t_delay, times): + # Reset the qubit + qubit.reset(node.parameters.reset_type, node.parameters.simulate) + align() + + # Step the qubit spectroscopy tone frequency + qubit.xy.update_frequency(df + qubit.xy.intermediate_frequency - if_update) + qubit.align() + # Play the coupler flux pulse + coupler.play( + "const", + amplitude_scale=coupler_flux_amp / coupler.operations["const"].amplitude, + duration=t_delay + 200, + ) + # Wait for a variable time + qubit.xy.wait(t_delay) + # Play the qubit spectroscopy pulse + qubit.xy.play(operation_name, amplitude_scale=operation_amp_scale) + qubit.xy.update_frequency(qubit.xy.intermediate_frequency) + qubit.align() + qubit.wait(200) + + if node.parameters.use_state_discrimination: + qubit.readout_state(state[0]) + save(state[0], state_st[0]) + else: + qubit.resonator.measure("readout", qua_vars=(I[0], Q[0])) + save(I[0], I_st[0]) + save(Q[0], Q_st[0]) + align() with stream_processing(): n_st.save("n") - for i in range(num_qubit_pairs): - if node.parameters.use_state_discrimination: - state_st[i].buffer(len(times)).buffer(len(dfs)).average().save(f"state{i + 1}") - else: - I_st[i].buffer(len(times)).buffer(len(dfs)).average().save(f"I{i + 1}") - Q_st[i].buffer(len(times)).buffer(len(dfs)).average().save(f"Q{i + 1}") + if node.parameters.use_state_discrimination: + state_st[0].buffer(len(times)).buffer(len(dfs)).average().save("state1") + else: + I_st[0].buffer(len(times)).buffer(len(dfs)).average().save("I1") + Q_st[0].buffer(len(times)).buffer(len(dfs)).average().save("Q1") node.namespace["qua_program"] = qua_prog @@ -329,11 +299,6 @@ def execute_qua_program(node: QualibrationNode[Parameters, Quam]): for dataset in data_fetcher: progress_counter(data_fetcher.get("n", 0), node.parameters.num_shots, start_time=data_fetcher.t_start) node.log(job.execution_report()) - # Rename qubit_pair dimension to qubit for compatibility with analysis functions - if "qubit_pair" in dataset.dims: - qubit_names = [q.name for q in node.namespace["measured_qubits"]] - dataset = dataset.rename({"qubit_pair": "qubit"}) - dataset = dataset.assign_coords(qubit=qubit_names) node.results["ds_raw"] = dataset @@ -345,24 +310,25 @@ def load_data(node: QualibrationNode[Parameters, Quam]): node.load_from_id(load_id) node.parameters.load_data_id = load_id - # Get qubit pairs and extract measured qubits - node.namespace["qubit_pairs"] = qubit_pairs = get_qubit_pairs(node) - measured_qubits = [] - for qp in qubit_pairs: - if node.parameters.measure_qubit == "control": - measured_qubits.append(qp.qubit_control) - else: - measured_qubits.append(qp.qubit_target) - node.namespace["measured_qubits"] = measured_qubits - node.namespace["qubits"] = measured_qubits - - # Rename qubit_pair dimension to qubit for compatibility with analysis functions - if "qubit_pair" in node.results["ds_raw"].dims: - qubit_names = [q.name for q in measured_qubits] - node.results["ds_raw"] = node.results["ds_raw"].rename({"qubit_pair": "qubit"}) - node.results["ds_raw"] = node.results["ds_raw"].assign_coords(qubit=qubit_names) - - # Overwrite the loaded node parrameters with the ones defined from the GUI + machine = node.machine + + # Get the qubits from parameters or use active qubits + if not node.parameters.qubits: + qubits = machine.active_qubits + else: + qubits = [machine.qubits[q] for q in node.parameters.qubits] + + assert len(qubits) == 1, "This node only supports one qubit at a time." + qubit = qubits[0] + + # Get the coupler from parameters + assert node.parameters.coupler is not None, "Coupler parameter must be specified." + coupler = machine.qubit_pairs[node.parameters.coupler].coupler + + node.namespace["qubits"] = [qubit] + node.namespace["coupler"] = coupler + + # Overwrite the loaded node parameters with the ones defined from the GUI node.parameters.fitting_base_fractions = loaded_fractions node.parameters.update_state_from_GUI = stored_gui_update_flag if node.parameters.update_state_from_GUI: @@ -411,17 +377,17 @@ def update_state(node: QualibrationNode[Parameters, Quam]): """Update IIR filter tabs on coupler if fitting was successful.""" if not node.parameters.update_state: return - qubit_pairs = node.namespace["qubit_pairs"] - measured_qubits = node.namespace["qubits"] + + coupler = node.namespace["coupler"] + qubits = node.namespace["qubits"] # Initialize coupler exponential_filter if needed - for qp in qubit_pairs: - coupler_out = qp.coupler.opx_output - if coupler_out.exponential_filter is None: - coupler_out.exponential_filter = [] + coupler_out = coupler.opx_output + if coupler_out.exponential_filter is None: + coupler_out.exponential_filter = [] with node.record_state_updates(): - for qp, q in zip(qubit_pairs, measured_qubits): + for q in qubits: res = node.results["fit_results"][q.name] # Support dict or dataclass fit_success = res["fit_successful"] @@ -431,8 +397,8 @@ def update_state(node: QualibrationNode[Parameters, Quam]): components = res["a_tau_tuple"] A_list = [amp / best_a_dc for amp, _ in components] tau_list = [tau for _, tau in components] - qp.coupler.opx_output.exponential_filter.extend(list(zip(A_list, tau_list))) - print(f"Updated {qp.coupler.name} filter to: {qp.coupler.opx_output.exponential_filter}") + coupler.opx_output.exponential_filter.extend(list(zip(A_list, tau_list))) + print(f"Updated {coupler.name} filter to: {coupler.opx_output.exponential_filter}") # %% {Save_results}