Skip to content

Adding a full nectarchain calibration pipeline to the TRR#264

Open
hashkar wants to merge 12 commits intocta-observatory:mainfrom
hashkar:TRR_charge
Open

Adding a full nectarchain calibration pipeline to the TRR#264
hashkar wants to merge 12 commits intocta-observatory:mainfrom
hashkar:TRR_charge

Conversation

@hashkar
Copy link
Collaborator

@hashkar hashkar commented Feb 12, 2026

This script performs a complete calibration including:

  1. Pedestal computation
  2. Gain (SPE fit) computation
  3. Flatfield computation
  4. Charge extraction
  5. Calibrated charge computation (pedestal subtraction, gain correction, FF correction)
  6. Plotting of all calibration parameters individually and vs temperature

The script is designed to be flexible and configurable via command-line arguments,
allowing users to specify run numbers, processing options, and output directories.
It also includes robust path resolution logic to handle the various output locations
used by the nectarchain tools.

Depending on the mode it can run and plot each entity separatly: pedestals, gain, FF and charge
or it can run in mode full and compute calibrated charges by doing: charge computation, pedestal subtraction, gain and FF corrections.

Example usage in full mode:

python nectarcam_full_calibration_analysis.py
--mode full \
--pedestal_runs 7020 7077\
--gain_runs 7066 7123\
--flatfield_runs 7066 7123\
--charge_runs 7020 7077\
--temperatures 25 20\
--max_events_gain 5000

run python nectarcam_full_calibration_analysis.py -h for more options.

@codecov
Copy link

codecov bot commented Feb 12, 2026

Codecov Report

❌ Patch coverage is 0% with 869 lines in your changes missing coverage. Please review.
✅ Project coverage is 46.02%. Comparing base (884264f) to head (4a4ef0e).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...archain/trr_test_suite/observation_temperatures.py 0.00% 869 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #264      +/-   ##
==========================================
- Coverage   51.90%   46.02%   -5.89%     
==========================================
  Files          82       83       +1     
  Lines        6845     7709     +864     
==========================================
- Hits         3553     3548       -5     
- Misses       3292     4161     +869     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jlenain jlenain added the enhancement New feature or request label Feb 12, 2026
Copy link
Contributor

@pcorcam pcorcam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hashkar !

Thanks for all the work on this PR. I've taken a global look at the script, and at first sight the functionality looks good to me. I just would not name it a "calibration pipeline" - more like a "observation temperature" TRR use-case script.

I do have quite a few major comments though. My main concern is that you "re-invent the wheel" a few times when it comes to reading h5 files, finding existing files, and defining output directories. This leads to a lot of (unnecessary) code duplication, which we should avoid.

In addition, I think we should not use default values as an option for the TRR, but more as a failsafe if, for some reason, a calibration tool did not successfully execute. But we should always try to run a calibration tool if it is needed for a subsequent step in the pipeline.

Just FYI, I have not looked into detail into the production of the plots yet, because I would like to focus on these major concerns first. In any case, I am willing to contribute to further development of this PR if you would find it useful.

Thanks again!

return parser


class NectarCAMCalibrationPipeline:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to rename NectarCAMCalibrationPipeline to something like ObservationTemperaturePipeline. This is more descriptive in the TRR context, and gives a better distinction between this class and the PipelineNectarCAMCalibrationTool that already exists in nectarchain.

Comment on lines +523 to +791
def _explore_hdf5_keys(self, f, max_depth=8):
"""
Recursively collect all Dataset paths in an HDF5 file/group using
h5py's own type system. Only ``h5py.Dataset`` objects are added as
leaves; ``h5py.Group`` objects are always recursed into regardless of
whether they appear empty to a ``hasattr(keys)`` check.

This avoids the bug where PyTables Group nodes look like leaf objects
to a generic ``hasattr`` test, causing the caller to receive group
paths instead of dataset paths.
"""

paths = []

def _recurse(obj, path, depth):
if depth > max_depth:
return
if isinstance(obj, h5py.Dataset):
paths.append(path)
elif isinstance(obj, h5py.Group):
for k in obj.keys():
_recurse(obj[k], f"{path}/{k}", depth + 1)
# anything else (e.g. NamedType) is silently skipped

_recurse(f, "", 0)
return paths

def _load_hg_lg_from_hdf5(self, filepath, kind):
"""
Load (high_gain, low_gain) arrays from an HDF5 file produced by any
nectarchain calibration tool.

Returns (hg_array, lg_array) where each is 1D array of shape (n_pixels,)
or raises KeyError with full path dump.
"""
import h5py
import tables

with h5py.File(filepath, "r") as f:
# Collect all Dataset paths (Groups are recursed, never returned)
all_keys = self._explore_hdf5_keys(f)

# Also collect top-level Group names for debug when no datasets found
top_groups = list(f.keys())
self.log.debug(
f"[{kind}] {filepath}\n"
f" top groups : {top_groups}\n"
f" datasets : {all_keys}"
)

# ------------------------------------------------------------------
# PEDESTAL
# ------------------------------------------------------------------
if kind == "pedestal":
# The paths exist but are PyTables structured arrays
# Use PyTables to read them
h5file = tables.open_file(str(filepath), mode="r")
try:
# Try data_combined first (final merged data)
for node_path in [
"/data_combined/NectarCAMPedestalContainer_0",
"/data_1/NectarCAMPedestalContainer_0",
]:
if node_path in h5file:
table = h5file.get_node(node_path)
if len(table) > 0:
record = table[0]
ped_hg = record[
"pedestal_mean_hg"
] # Shape: (pixels, samples)
ped_lg = record["pedestal_mean_lg"]
h5file.close()

self.log.debug(f" pedestal raw shape: {ped_hg.shape}")

# Average over samples to get (pixels,)
if ped_hg.ndim > 1:
ped_hg = np.mean(ped_hg, axis=1)
ped_lg = np.mean(ped_lg, axis=1)

self.log.debug(
f" pedestal final shape: {ped_hg.shape}"
)
return ped_hg, ped_lg
h5file.close()
except Exception as e:
logging.exception(f"Error reading pedestal from {filepath}: {e}")
if h5file.isopen:
h5file.close()
raise

# ------------------------------------------------------------------
# GAIN
# ------------------------------------------------------------------
elif kind == "gain":
# Path exists: /data/SPEfitContainer_0
# It's a PyTables structured array
h5file = tables.open_file(str(filepath), mode="r")
try:
if "/data/SPEfitContainer_0" in h5file:
table = h5file.get_node("/data/SPEfitContainer_0")
if len(table) > 0:
record = table[0]
gain_hg = record["high_gain"]
gain_lg = record["low_gain"]
h5file.close()

self.log.debug(
f" gain shapes: HG={gain_hg.shape}, LG={gain_lg.shape}"
)

# Flatten to 1D if needed
if gain_hg.ndim > 1:
# Take mean or just flatten - depends on structure
# Take first column or mean
if gain_hg.shape[1] == 3:
# [value, error_low, error_high]
# Take first column (the actual value)
gain_hg = gain_hg[:, 0]
gain_lg = gain_lg[:, 0]
else:
# Unknown structure, flatten
gain_hg = gain_hg.ravel()
gain_lg = gain_lg.ravel()

self.log.debug(
f" gain shapes: HG={gain_hg.shape}, LG={gain_lg.shape}"
)
return gain_hg, gain_lg
h5file.close()
except Exception as e:
logging.exception(f"Error {filepath}: {e}")
if h5file.isopen:
h5file.close()
raise

# ------------------------------------------------------------------
# FLATFIELD
# ------------------------------------------------------------------
elif kind == "flatfield":
# Path exists: /data/FlatFieldContainer_0
# It's a PyTables structured array
h5file = tables.open_file(str(filepath), mode="r")
try:
if "/data/FlatFieldContainer_0" in h5file:
table = h5file.get_node("/data/FlatFieldContainer_0")
if len(table) > 0:
record = table[0]
if "FF_coef" in record.dtype.names:
ff_coef = record["FF_coef"] # Shape varies
h5file.close()

self.log.debug(f" FF_coef raw shape: {ff_coef.shape}")

# Average over events if 3D:
if ff_coef.ndim == 3:
ff_coef = np.mean(ff_coef, axis=0)

# Now should be (gains, pixels)
if ff_coef.ndim == 2 and ff_coef.shape[0] >= 2:
ff_hg = ff_coef[0]
ff_lg = ff_coef[1]
elif ff_coef.ndim == 1:
ff_hg = ff_coef
ff_lg = ff_coef
else:
# Unexpected shape
self.log.warning(
f" Unexpected FF_coef shape: {ff_coef.shape}"
)
if ff_coef.ndim >= 2:
ff_hg = ff_coef[0]
ff_lg = ff_coef[0]
else:
ff_hg = ff_coef
ff_lg = ff_coef

# Check for inf/nan and replace with 1.0
if not np.all(np.isfinite(ff_hg)):
n_bad = np.sum(~np.isfinite(ff_hg))
self.log.warning(
f" FF_coef HG has {n_bad} non-finite values, "
f"replacing with 1.0"
)
ff_hg = np.where(np.isfinite(ff_hg), ff_hg, 1.0)

if not np.all(np.isfinite(ff_lg)):
n_bad = np.sum(~np.isfinite(ff_lg))
self.log.warning(
f" FF_coef LG has {n_bad} non-finite values, "
f"replacing with 1.0"
)
ff_lg = np.where(np.isfinite(ff_lg), ff_lg, 1.0)

self.log.debug(
f" FF_coef final: HG shape={ff_hg.shape}, "
f"mean={np.mean(ff_hg):.3f},LGshape={ff_lg.shape}, "
f"mean={np.mean(ff_lg):.3f}"
)

return ff_hg, ff_lg
h5file.close()
except Exception as e:
logging.exception(f"Error {filepath}: {e}")
if h5file.isopen:
h5file.close()
raise

# ------------------------------------------------------------------
# CHARGE
# ------------------------------------------------------------------
elif kind == "charge":
# Paths exist:
# /data/ChargesContainer_0/FLATFIELD,
# /data/ChargesContainer_0/SKY_PEDESTAL
# These are PyTables structured arrays
h5file = tables.open_file(str(filepath), mode="r")
try:
if "/data/ChargesContainer_0" in h5file:
container = h5file.get_node("/data/ChargesContainer_0")

hg_parts, lg_parts = [], []

# Iterate over trigger datasets
for child in container._f_iter_nodes("Table"):
if len(child) > 0:
record = child[0]
if "charges_hg" in record.dtype.names:
charges_hg = record["charges_hg"]
charges_lg = record["charges_lg"]

self.log.debug(
f" {child._v_name} raw shapes: "
f"HG={charges_hg.shape}, LG={charges_lg.shape}"
)

# Flatten to (events, pixels)
hg_parts.append(
charges_hg.reshape(-1, charges_hg.shape[-1])
)
lg_parts.append(
charges_lg.reshape(-1, charges_lg.shape[-1])
)

h5file.close()

if hg_parts:
hg = np.concatenate(hg_parts, axis=0)
lg = np.concatenate(lg_parts, axis=0)

self.log.debug(
f" charge final shapes: HG={hg.shape}, LG={lg.shape}"
)

# Return mean over events to get (pixels,)
return np.mean(hg, axis=0), np.mean(lg, axis=0)

h5file.close()
except Exception as e:
logging.exception(f"Error {filepath}: {e}")
if h5file.isopen:
h5file.close()
raise

raise KeyError(
f"Cannot find {kind} data in {filepath}.\n"
f" Top-level groups : {top_groups}\n"
f" All dataset paths: {all_keys}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a built-in method in nectarchain to read the data containers written on h5 output files. Please use that to read the h5 files in order to not introduce more code duplication in nectarchain; through the containers you can directly access all the data fields you need. An example would be something like:

pedestal_container = next( NectarCAMPedestalContainer.from_hdf5("/path/to/pedestal.h5") )
pedestal_hg = pedestal_container["pedestal_mean_hg"]

Comment on lines +1115 to +1132
# Waveforms step
wfs_tool = WaveformsNectarCAMCalibrationTool(
progress_bar=True,
camera=self.args.camera,
run_number=run_number,
max_events=self.args.max_events_charge,
log_level=logging.getLevelName(self.log.level),
overwrite=self.args.recompute_charge or self.args.recompute_all,
)

try:
wfs_tool.setup()
wfs_tool.start()
wfs_tool.finish()
self.log.info(f"Waveforms extracted for run {run_number}")
except Exception as e:
self.log.error(f"Error extracting waveforms: {e}", exc_info=True)
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why include this step? Waveforms are called automatically when extracting the charge using the ChargesNectarCAMCalibrationTool. Saving waveforms also takes up a lot of disk space. I suggest to remove it.

Comment on lines +1334 to +1442
# Load charge data – full event-by-event array needed (not mean).
# Structure: data/ChargesContainer_0/<TRIGGER_TYPE>/charges_hg|lg
# where TRIGGER_TYPE is a PyTables structured array dataset
try:
import tables

# Use PyTables to read the structured arrays
h5file = tables.open_file(str(charge_path), mode="r")

try:
container = h5file.root.data.ChargesContainer_0

hg_parts, lg_parts = [], []

# Load only FLATFIELD trigger type
if "FLATFIELD" in container._v_children:
dataset = container._f_get_child("FLATFIELD")

if len(dataset) > 0:
# Access the structured array
data_record = dataset[0]

# Check if it has charges_hg field
if "charges_hg" in dataset.dtype.names:
charges_hg = data_record["charges_hg"]
charges_lg = data_record["charges_lg"]

self.log.info(
f" Loaded charges from FLATFIELD: "
f"HGshape{charges_hg.shape}, LGshape{charges_lg.shape}"
)

# Handle shape mismatch
if charges_hg.shape != charges_lg.shape:
self.log.warning(
f"HG:{charges_hg.shape}, LG:{charges_lg.shape}"
)
# Truncate to common shape
min_events = min(
charges_hg.shape[0], charges_lg.shape[0]
)
min_pixels = min(
charges_hg.shape[1], charges_lg.shape[1]
)
charges_hg = charges_hg[:min_events, :min_pixels]
charges_lg = charges_lg[:min_events, :min_pixels]
self.log.info(f" Truncated to: {charges_hg.shape}")

# Flatten if 3D
if charges_hg.ndim == 3:
charges_hg = charges_hg.reshape(
-1, charges_hg.shape[-1]
)
charges_lg = charges_lg.reshape(
-1, charges_lg.shape[-1]
)

hg_parts.append(charges_hg)
lg_parts.append(charges_lg)
else:
raise KeyError("FLATFIELD dataset not found in ChargesContainer_0")

h5file.close()

if not hg_parts:
raise KeyError(
f"Cannot find fields in ChargesContainer_0. "
f"Available datasets: {list(container._v_children.keys())}"
)

# Concatenate all trigger types
charges_hg = np.concatenate(
hg_parts, axis=0
) # (total_events, n_pixels)
charges_lg = np.concatenate(lg_parts, axis=0)
charges = np.stack([charges_hg, charges_lg], axis=1)
# charges: (total_events, 2, n_pixels)

self.log.info(
f" Total charge shape: {charges.shape} "
f"({len(hg_parts)} trigger types combined)"
)

except Exception as e:
logging.exception(f"Error {charge_path}: {e}")
if h5file.isopen:
h5file.close()
raise

except Exception as e:
self.log.error(f"Error loading charge data: {e}", exc_info=True)
return None

# Load pedestal data (with defaults if needed)
ped_hg, ped_lg = self._load_or_default_calibration(pedestal_run, "pedestal")
pedestals = np.stack([ped_hg, ped_lg], axis=0)
self.log.info(
f" Pedestals: HG mean={np.mean(ped_hg):.2f}, shape={ped_hg.shape}"
)

# Load gain data (with defaults if needed)
gain_hg, gain_lg = self._load_or_default_calibration(gain_run, "gain")
gains = np.stack([gain_hg, gain_lg], axis=0)
self.log.info(f" Gains: HG mean={np.mean(gain_hg):.2f}, shape={gain_hg.shape}")

# Load flatfield data (with defaults if needed)
ff_hg, ff_lg = self._load_or_default_calibration(flatfield_run, "flatfield")
ff_coef = np.stack([ff_hg, ff_lg], axis=0)
self.log.info(f" FF coef: HG mean={np.mean(ff_hg):.3f}, shape={ff_hg.shape}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.gain_dir = self.data_dir / "gains"
self.flatfield_dir = self.data_dir / "flatfields"
self.charge_dir = self.data_dir / "charges"
self.calibrated_dir = self.data_dir / "calibrated_charges"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't use this, please remove

self.log.info(f" ff_coef shape: {ff_coef.shape}")

# Vectorized calibration (much faster than loops)
ped_subtracted = charges - pedestals[np.newaxis, :, :] # Broadcast pedestals
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.pedestal_results[run_number] = output_path
self.log.info(f"Pedestal saved to {output_path}")

return output_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the philosophy, but I think the implementation can be simplified quite a bit (reference to comments https://github.com/cta-observatory/nectarchain/pull/264/changes#r2813004175 and https://github.com/cta-observatory/nectarchain/pull/264/changes#r2813213916). You can always initialize the tool, and then only run it if the corresponding output path does not exist. Something like:

tool = PedestalNectarCAMCalibrationTool(**kwargs)

if not os.path.exists(tool.output_path):
    run_tool(tool)

kwargs["events_per_slice"] = self.args.gain_events_per_slice
return kwargs

def compute_gain(self, run_number, max_events=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi_lo_ratio = gain[constants.HIGH_GAIN] / gain[constants.LOW_GAIN]
return hi_lo_ratio

def compute_flatfield(self, run_number):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +1212 to +1282
def _load_or_default_calibration(self, run_number, kind):
"""
Load calibration data for a run, with fallback to default values.

Returns (hg_array, lg_array) tuple.

Default values:
- pedestal: 250 ADC for both HG and LG
- gain: 58.0 ADC/p.e. for HG, 4.46 ADC/p.e. for LG
- flatfield: 1.0 for both HG and LG
"""
n_pixels = constants.N_PIXELS

# Define defaults
defaults = {
"pedestal": (
np.full(n_pixels, 250.0), # HG
np.full(n_pixels, 250.0), # LG
),
"gain": (
np.full(n_pixels, 58.0), # HG
np.full(n_pixels, 4.46), # LG (58/13)
),
"flatfield": (
np.full(n_pixels, 1.0), # HG
np.full(n_pixels, 1.0), # LG
),
}

# Try to load from file
try:
if kind == "pedestal":
path = self._get_result_path(
self.pedestal_results,
run_number,
self.get_pedestal_output_path,
"pedestal",
)
elif kind == "gain":
path = self._get_result_path(
self.gain_results,
run_number,
self.get_gain_output_path,
"gain",
)
elif kind == "flatfield":
path = self._get_result_path(
self.flatfield_results,
run_number,
lambda r: self.get_flatfield_output_path(r, iteration=2),
"flatfield",
)
else:
raise ValueError(f"Unknown calibration kind: {kind}")

# Load from HDF5
hg, lg = self._load_hg_lg_from_hdf5(path, kind)
self.log.info(f"✓ Loaded {kind} from {path}")
return hg, lg

except (FileNotFoundError, KeyError, Exception) as e:
self.log.warning(
f"Cannot load {kind} for run {run_number}: {e}. "
f"Using default values."
)
hg_default, lg_default = defaults[kind]
self.log.info(
f" {kind.capitalize()} defaults: "
f"HG={hg_default[0]:.2f}, LG={lg_default[0]:.2f}"
)
return hg_default, lg_default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t know if allowing for default values here is something that we want to do in the TRR use-case. The problem I see is the following: If you e.g. compute the flat-field coefficients with default gain values, it will give you a certain output file. Let’s say that later you compute the gain, and relaunch the flat-field computation. Since it will already find the corresponding flatfield output file (the one with default values), it will not be updated. So you’d have to manually remove the flatfield output file first, which is not user friendly; also your results would change.

I think using default values should always be seen as a failsafe if for some reason the calculation of coefficients failed for example. My suggestion: When computing the flatfield, if the gain/pedestal have not been computed yet, force their computation. Similar for calibrated charge. Only fall back to defaults as a "last resort".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants