Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import warnings
from dataclasses import asdict
from typing import ClassVar, List, Literal
from typing import List, Literal, Optional

import matplotlib.pyplot as plt
import numpy as np
Expand All @@ -24,24 +24,21 @@
from qualibrate import NodeParameters, QualibrationNode
from qualibration_libs.core import tracked_updates
from qualibration_libs.data import XarrayDataFetcher
from qualibration_libs.parameters import (
CommonNodeParameters,
QubitPairExperimentNodeParameters,
get_qubit_pairs,
)
from qualibration_libs.parameters import CommonNodeParameters
from qualibration_libs.runtime import simulate_and_plot
from quam_config import Quam


class Parameters(
NodeParameters,
CommonNodeParameters,
QubitPairExperimentNodeParameters,
):
"""Parameters for pi vs coupler flux calibration node."""

targets_name: ClassVar[str] = "qubit_pairs"

qubits: Optional[List[str]] = None
"""List of qubit names to calibrate. If None or empty, all active qubits are used."""
coupler: Optional[str] = None
"""Name of the coupler to use for flux pulsing (e.g., 'coupler_q1_q2')."""
num_shots: int = 30
"""Number of shots to acquire."""
operation: str = "x180"
Expand Down Expand Up @@ -70,8 +67,6 @@ class Parameters(
"""Whether to update the state. CAUTION: assumes fitting will be acceptable"""
update_state_from_GUI: bool = False
"""Whether to update the state from the GUI, select when fitting is successful"""
measure_qubit: Literal["control", "target"] = "target"
"""Which qubit to measure from the pair: 'control' or 'target'. Default is 'target'."""
coupler_flux_amplitude: float = 0.1
"""Amplitude of the coupler flux pulse in Volts. Default is 0.1V."""

Expand Down Expand Up @@ -129,27 +124,28 @@ def custom_param(node: QualibrationNode[Parameters, Quam]):
def create_qua_program(node: QualibrationNode[Parameters, Quam]):
"""Create the sweep axes and generate the QUA program for pi vs coupler flux measurement."""
u = unit(coerce_to_integer=True)
# Get qubit pairs and extract measured qubits
node.namespace["qubit_pairs"] = qubit_pairs = get_qubit_pairs(node)
num_qubit_pairs = len(qubit_pairs)

# Extract the measured qubits based on measure_qubit parameter
measured_qubits = []
for qp in qubit_pairs:
if node.parameters.measure_qubit == "control":
measured_qubits.append(qp.qubit_control)
else:
measured_qubits.append(qp.qubit_target)
node.namespace["measured_qubits"] = measured_qubits
# Also set in qubits for compatibility with analysis functions
node.namespace["qubits"] = measured_qubits
machine = node.machine

# Get the qubits from parameters or use active qubits
if not node.parameters.qubits:
qubits = machine.active_qubits
else:
qubits = [machine.qubits[q] for q in node.parameters.qubits]

assert len(qubits) == 1, "This node only supports one qubit at a time."
qubit = qubits[0]

# Get the coupler from parameters
assert node.parameters.coupler is not None, "Coupler parameter must be specified."
coupler = machine.qubit_pairs[node.parameters.coupler].coupler

node.namespace["qubits"] = [qubit]
node.namespace["coupler"] = coupler

operation_name = node.parameters.operation

# Ensure operation exists and default to x180 if not
for qubit in measured_qubits:
if hasattr(qubit.xy.operations, operation_name):
continue
if not hasattr(qubit.xy.operations, operation_name):
warnings.warn(f"Qubit {qubit.name} has no operation '{operation_name}', defaulting to 'x180'")
operation_name = "x180"

Expand Down Expand Up @@ -180,128 +176,102 @@ def create_qua_program(node: QualibrationNode[Parameters, Quam]):
# Coupler flux amplitude from parameter
coupler_flux_amp = node.parameters.coupler_flux_amplitude

# Sweep axes for data fetcher - use qubit_pair dimension
node.namespace["sweep_axes"] = {
"qubit_pair": xr.DataArray(qubit_pairs.get_names()),
"qubit": xr.DataArray([qubit.name]),
"detuning": xr.DataArray(dfs, attrs={"long_name": "qubit frequency", "units": "Hz"}),
"time": xr.DataArray(4 * times, attrs={"long_name": "Flux pulse duration", "units": "ns"}),
}

# Track LO updates
tracked_qubits = []
if_update = []

for q in measured_qubits:
# Decide if updating the LO is needed depending on the detuning request
if (
q.xy.intermediate_frequency
- node.parameters.detuning_in_mhz * 1e6
- node.parameters.frequency_span_in_mhz * 1e6 / 2
) < -400e6:
node.parameters.reset_type = "thermal" # Active reset will not work if the lo is changed
warnings.warn(
"Qubit LO has been changed to reach desired detuning, "
"active reset will not work. Reset type changed to thermal."
)
if_update.append(0)
# track the LO and IF changes to revert later
with tracked_updates(q, auto_revert=False, dont_assign_to_none=False) as q_upd:
rf_frequency = q_upd.xy.intermediate_frequency + q_upd.xy.opx_output.upconverter_frequency
lo_frequency = q_upd.xy.opx_output.upconverter_frequency - node.parameters.detuning_in_mhz * 1e6
if (q_upd.xy.opx_output.band == 3) and (lo_frequency < 6.5e9):
raise ValueError("Requested detuning is too large for the given MW FEM band")
if (q_upd.xy.opx_output.band == 2) and (lo_frequency < 4.5e9):
raise ValueError("Requested detuning is too large for the given MW FEM band")
print(f"Updating {q_upd.name} LO to {lo_frequency}")
q_upd.xy.opx_output.upconverter_frequency = lo_frequency
q_upd.xy.RF_frequency -= node.parameters.detuning_in_mhz * 1e6
tracked_qubits.append(q_upd)
else:
if_update.append(int(node.parameters.detuning_in_mhz))
if_update = 0

# Decide if updating the LO is needed depending on the detuning request
if (
qubit.xy.intermediate_frequency
- node.parameters.detuning_in_mhz * 1e6
- node.parameters.frequency_span_in_mhz * 1e6 / 2
) < -400e6:
node.parameters.reset_type = "thermal" # Active reset will not work if the lo is changed
warnings.warn(
"Qubit LO has been changed to reach desired detuning, "
"active reset will not work. Reset type changed to thermal."
)
if_update = 0
# track the LO and IF changes to revert later
with tracked_updates(qubit, auto_revert=False, dont_assign_to_none=False) as q_upd:
lo_frequency = q_upd.xy.opx_output.upconverter_frequency - node.parameters.detuning_in_mhz * 1e6
if (q_upd.xy.opx_output.band == 3) and (lo_frequency < 6.5e9):
raise ValueError("Requested detuning is too large for the given MW FEM band")
if (q_upd.xy.opx_output.band == 2) and (lo_frequency < 4.5e9):
raise ValueError("Requested detuning is too large for the given MW FEM band")
print(f"Updating {q_upd.name} LO to {lo_frequency}")
q_upd.xy.opx_output.upconverter_frequency = lo_frequency
q_upd.xy.RF_frequency -= node.parameters.detuning_in_mhz * 1e6
tracked_qubits.append(q_upd)
else:
if_update = int(node.parameters.detuning_in_mhz)

node.namespace["if_update"] = if_update
node.namespace["tracked_qubits"] = tracked_qubits

with program() as qua_prog:
I, I_st, Q, Q_st, n, n_st = node.machine.declare_qua_variables()
if node.parameters.use_state_discrimination:
state = [declare(int) for _ in range(num_qubit_pairs)]
state_st = [declare_stream() for _ in range(num_qubit_pairs)]
state = [declare(int) for _ in range(1)]
state_st = [declare_stream() for _ in range(1)]

df = declare(int)
t_delay = declare(int)

for multiplexed_qubit_pairs in qubit_pairs.batch():
# Place qubits to their respective flux point (initialize both qubits in pair)
for qp in multiplexed_qubit_pairs.values():
node.machine.initialize_qpu(target=qp.qubit_control)
node.machine.initialize_qpu(target=qp.qubit_target)
align()
# Averaging loop
with for_(n, 0, n < node.parameters.num_shots, n + 1):
save(n, n_st)
# Qubit spectroscopy frequency loop
with for_(*from_array(df, dfs)):
# Time delay loop
with for_each_(t_delay, times):
# Reset the measured qubits
for i, qp in multiplexed_qubit_pairs.items():
# Select the measured qubit based on measure_qubit parameter
if node.parameters.measure_qubit == "control":
qubit = qp.qubit_control
else:
qubit = qp.qubit_target
qubit.reset(node.parameters.reset_type, node.parameters.simulate)
align()

for i, qp in multiplexed_qubit_pairs.items():
# Select the measured qubit based on measure_qubit parameter
if node.parameters.measure_qubit == "control":
qubit = qp.qubit_control
else:
qubit = qp.qubit_target

# Step the qubit spectroscopy tone frequency
qubit.xy.update_frequency(df + qubit.xy.intermediate_frequency - if_update[i])
qubit.align()
# Play the coupler flux pulse
qp.coupler.play(
"const",
amplitude_scale=coupler_flux_amp / qp.coupler.operations["const"].amplitude,
duration=t_delay + 200,
)
# Wait for a variable time
qubit.xy.wait(t_delay)
# Play the qubit spectroscopy pulse
qubit.xy.play(operation_name, amplitude_scale=operation_amp_scale)
qubit.xy.update_frequency(qubit.xy.intermediate_frequency)
qubit.align()
qubit.wait(200)

for i, qp in multiplexed_qubit_pairs.items():
# Select the measured qubit based on measure_qubit parameter
if node.parameters.measure_qubit == "control":
qubit = qp.qubit_control
else:
qubit = qp.qubit_target

if node.parameters.use_state_discrimination:
qubit.readout_state(state[i])
save(state[i], state_st[i])
else:
qubit.resonator.measure("readout", qua_vars=(I[i], Q[i]))
save(I[i], I_st[i])
save(Q[i], Q_st[i])
align()
# Initialize the QPU for the qubit
node.machine.initialize_qpu(target=qubit)
align()

# Averaging loop
with for_(n, 0, n < node.parameters.num_shots, n + 1):
save(n, n_st)
# Qubit spectroscopy frequency loop
with for_(*from_array(df, dfs)):
# Time delay loop
with for_each_(t_delay, times):
# Reset the qubit
qubit.reset(node.parameters.reset_type, node.parameters.simulate)
align()

# Step the qubit spectroscopy tone frequency
qubit.xy.update_frequency(df + qubit.xy.intermediate_frequency - if_update)
qubit.align()
# Play the coupler flux pulse
coupler.play(
"const",
amplitude_scale=coupler_flux_amp / coupler.operations["const"].amplitude,
duration=t_delay + 200,
)
# Wait for a variable time
qubit.xy.wait(t_delay)
# Play the qubit spectroscopy pulse
qubit.xy.play(operation_name, amplitude_scale=operation_amp_scale)
qubit.xy.update_frequency(qubit.xy.intermediate_frequency)
qubit.align()
qubit.wait(200)

if node.parameters.use_state_discrimination:
qubit.readout_state(state[0])
save(state[0], state_st[0])
else:
qubit.resonator.measure("readout", qua_vars=(I[0], Q[0]))
save(I[0], I_st[0])
save(Q[0], Q_st[0])
align()

with stream_processing():
n_st.save("n")
for i in range(num_qubit_pairs):
if node.parameters.use_state_discrimination:
state_st[i].buffer(len(times)).buffer(len(dfs)).average().save(f"state{i + 1}")
else:
I_st[i].buffer(len(times)).buffer(len(dfs)).average().save(f"I{i + 1}")
Q_st[i].buffer(len(times)).buffer(len(dfs)).average().save(f"Q{i + 1}")
if node.parameters.use_state_discrimination:
state_st[0].buffer(len(times)).buffer(len(dfs)).average().save("state1")
else:
I_st[0].buffer(len(times)).buffer(len(dfs)).average().save("I1")
Q_st[0].buffer(len(times)).buffer(len(dfs)).average().save("Q1")

node.namespace["qua_program"] = qua_prog

Expand Down Expand Up @@ -329,11 +299,6 @@ def execute_qua_program(node: QualibrationNode[Parameters, Quam]):
for dataset in data_fetcher:
progress_counter(data_fetcher.get("n", 0), node.parameters.num_shots, start_time=data_fetcher.t_start)
node.log(job.execution_report())
# Rename qubit_pair dimension to qubit for compatibility with analysis functions
if "qubit_pair" in dataset.dims:
qubit_names = [q.name for q in node.namespace["measured_qubits"]]
dataset = dataset.rename({"qubit_pair": "qubit"})
dataset = dataset.assign_coords(qubit=qubit_names)
node.results["ds_raw"] = dataset


Expand All @@ -345,24 +310,25 @@ def load_data(node: QualibrationNode[Parameters, Quam]):
node.load_from_id(load_id)
node.parameters.load_data_id = load_id

# Get qubit pairs and extract measured qubits
node.namespace["qubit_pairs"] = qubit_pairs = get_qubit_pairs(node)
measured_qubits = []
for qp in qubit_pairs:
if node.parameters.measure_qubit == "control":
measured_qubits.append(qp.qubit_control)
else:
measured_qubits.append(qp.qubit_target)
node.namespace["measured_qubits"] = measured_qubits
node.namespace["qubits"] = measured_qubits

# Rename qubit_pair dimension to qubit for compatibility with analysis functions
if "qubit_pair" in node.results["ds_raw"].dims:
qubit_names = [q.name for q in measured_qubits]
node.results["ds_raw"] = node.results["ds_raw"].rename({"qubit_pair": "qubit"})
node.results["ds_raw"] = node.results["ds_raw"].assign_coords(qubit=qubit_names)

# Overwrite the loaded node parrameters with the ones defined from the GUI
machine = node.machine

# Get the qubits from parameters or use active qubits
if not node.parameters.qubits:
qubits = machine.active_qubits
else:
qubits = [machine.qubits[q] for q in node.parameters.qubits]

assert len(qubits) == 1, "This node only supports one qubit at a time."
qubit = qubits[0]

# Get the coupler from parameters
assert node.parameters.coupler is not None, "Coupler parameter must be specified."
coupler = machine.qubit_pairs[node.parameters.coupler].coupler

node.namespace["qubits"] = [qubit]
node.namespace["coupler"] = coupler

# Overwrite the loaded node parameters with the ones defined from the GUI
node.parameters.fitting_base_fractions = loaded_fractions
node.parameters.update_state_from_GUI = stored_gui_update_flag
if node.parameters.update_state_from_GUI:
Expand Down Expand Up @@ -411,17 +377,17 @@ def update_state(node: QualibrationNode[Parameters, Quam]):
"""Update IIR filter tabs on coupler if fitting was successful."""
if not node.parameters.update_state:
return
qubit_pairs = node.namespace["qubit_pairs"]
measured_qubits = node.namespace["qubits"]

coupler = node.namespace["coupler"]
qubits = node.namespace["qubits"]

# Initialize coupler exponential_filter if needed
for qp in qubit_pairs:
coupler_out = qp.coupler.opx_output
if coupler_out.exponential_filter is None:
coupler_out.exponential_filter = []
coupler_out = coupler.opx_output
if coupler_out.exponential_filter is None:
coupler_out.exponential_filter = []

with node.record_state_updates():
for qp, q in zip(qubit_pairs, measured_qubits):
for q in qubits:
res = node.results["fit_results"][q.name]
# Support dict or dataclass
fit_success = res["fit_successful"]
Expand All @@ -431,8 +397,8 @@ def update_state(node: QualibrationNode[Parameters, Quam]):
components = res["a_tau_tuple"]
A_list = [amp / best_a_dc for amp, _ in components]
tau_list = [tau for _, tau in components]
qp.coupler.opx_output.exponential_filter.extend(list(zip(A_list, tau_list)))
print(f"Updated {qp.coupler.name} filter to: {qp.coupler.opx_output.exponential_filter}")
coupler.opx_output.exponential_filter.extend(list(zip(A_list, tau_list)))
print(f"Updated {coupler.name} filter to: {coupler.opx_output.exponential_filter}")


# %% {Save_results}
Expand Down
Loading