From c3faf9f544af690c01b5104cb4ddbcd0e175d859 Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Tue, 16 Dec 2025 11:18:27 -0700 Subject: [PATCH 1/3] Add hi_goodtimes with Goodtimes class to be used for storing goodtimes data through processing and writing goodtimes text file --- imap_processing/hi/hi_goodtimes.py | 470 ++++++++++++++++ imap_processing/tests/hi/test_hi_goodtimes.py | 515 ++++++++++++++++++ 2 files changed, 985 insertions(+) create mode 100644 imap_processing/hi/hi_goodtimes.py create mode 100644 imap_processing/tests/hi/test_hi_goodtimes.py diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py new file mode 100644 index 000000000..7c2f5a964 --- /dev/null +++ b/imap_processing/hi/hi_goodtimes.py @@ -0,0 +1,470 @@ +"""IMAP-HI Goodtimes processing module.""" + +import logging +import re +from enum import IntEnum +from pathlib import Path +from typing import Self + +import numpy as np +import xarray as xr + +from imap_processing.hi.utils import parse_sensor_number + +logger = logging.getLogger(__name__) + +# Structured dtype for good time intervals +INTERVAL_DTYPE = np.dtype( + [ + ("met_start", np.float64), + ("met_end", np.float64), + ("spin_bin_low", np.int32), + ("spin_bin_high", np.int32), + ("n_good_bins", np.int32), + ("esa_step", np.uint8), + ] +) + + +class CullCode(IntEnum): + """Cull reason codes for good/bad time classification.""" + + GOOD = 0 + LOOSE = 1 + + +# mypy doesn't like subclassing Dataset +class Goodtimes(xr.Dataset): # type: ignore[misc] + """ + IMAP-Hi Good Times data structure. + + Tracks good/bad time intervals for a single Pointing based on validation + checks defined in the IMAP-Hi Algorithm Document Section 2.2.4 and 2.3.2. + + The data structure maintains a cull_flags array initialized to all zeros (good). + As bad times are identified by validation algorithms, they are flagged via + the `remove_times()` method with a non-zero cull code. + + Cull Codes: + * 0 : Good time (default) + * 1-N : Bad time, with specific cull reason code + + xarray.Dataset structure: + * Dimensions: + * met : int + Number of MET timestamps (one per 8-spin histogram packet, ~90 per pointing) + * spin_bin : int + Number of spin angle bins (90 bins covering 0-360 degrees) + * Coordinates + * met : numpy.ndarray + Mission Elapsed Time values for each 8-spin interval + * spin_bin : numpy.ndarray + Spin bin indices (0-89) + * Data Variables + * cull_flags : xarray.DataArray (met, spin_bin) + Cull flags where 0=good time, non-zero=bad time with cull reason code + * esa_step : xarray.DataArray (met,) + ESA energy step for each MET timestamp + * Attributes + * sensor : str + Sensor identifier ('45sensor' or '90sensor') + * pointing : int + Pointing number for this dataset + """ + + @classmethod + def from_l1a_de(cls, l1a_de: xr.Dataset) -> Self: + """ + Create Goodtimes object from L1A Direct Event data. + + Initializes all times and spin bins as good (cull_flags=0) for complete + 8-spin periods. Since we receive one packet every 4 spins but only record + MET every 8 spins, we expect MET values to appear in pairs. Only MET values + that appear as duplicates (pairs) are included, as single occurrences indicate + incomplete 8-spin periods. + + Parameters + ---------- + l1a_de : xarray.Dataset + L1A direct event data for this pointing. Used to extract MET timestamps + for each 8-spin interval. + + Returns + ------- + Goodtimes + Initialized Goodtimes object with cull_flags set to 0 (all good) for + complete 8-spin periods only. + """ + logger.info("Creating Goodtimes from L1A Direct Event data") + + # Extract MET times from packet metadata + # Each MET represents one 8-spin histogram packet interval + # Format: seconds + subseconds/1000 + met_all = ( + l1a_de["meta_seconds"].astype(float) + + l1a_de["meta_subseconds"].astype(float) / 1000 + ) + logger.debug(f"Extracted {len(met_all)} total MET entries from L1A DE data") + + # Find unique MET values, their counts, and indices of first occurrences + unique_mets, first_indices, counts = np.unique( + met_all.values, return_index=True, return_counts=True + ) + logger.debug(f"Found {len(unique_mets)} unique MET values") + + # Keep only MET values that appear as pairs (count == 2) + paired_mask = counts == 2 + first_occurrence_indices = first_indices[paired_mask] + + n_paired = int(np.sum(paired_mask)) + n_unpaired = len(unique_mets) - n_paired + logger.info( + f"Filtered to {n_paired} complete 8-spin periods " + f"(excluded {n_unpaired} incomplete periods)" + ) + + # Extract data for paired METs only + met = met_all.isel(epoch=first_occurrence_indices) + esa_step = l1a_de["esa_step"].isel(epoch=first_occurrence_indices) + + # Create coordinates + coords = { + "met": met.values, + "spin_bin": np.arange(90), + } + + # Create data variables + # Initialize cull_flags - all good (0) by default + # Shape: (n_met_timestamps, 90 spin_bins) + # Per alg doc Section 2.2.4: 90-element arrays, one per histogram packet + data_vars = { + "cull_flags": xr.DataArray( + np.zeros((len(met), 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ), + "esa_step": esa_step, + } + + # Create attributes + sensor_number = parse_sensor_number(l1a_de.attrs["Logical_source"]) + match = re.match(r"repoint(?P\d{5})", l1a_de.attrs["Repointing"]) + if not match: + raise ValueError( + f"Unable to parse sensor number from l1a_de Repointing " + f"attribute: {l1a_de.attrs['Repointing']}" + ) + attrs = { + "sensor": f"Hi{sensor_number}", + "pointing": int(match["pointing_num"]), + } + + return cls(data_vars, coords, attrs) + + def remove_times( + self, + met: np.ndarray | float | tuple[float, float], + bins: np.ndarray | int | None = None, + cull: int = 1, + ) -> None: + """ + Flag specific MET times and spin bins as bad times with a cull code. + + This method is called by external validation algorithms when bad times + are identified. It sets the cull_flags to the specified non-zero cull code + for the given MET timestamps and spin bins. + + Parameters + ---------- + met : numpy.ndarray, float, or tuple of (float, float) + MET timestamp(s) to flag as bad. Can be: + - Single float: one MET timestamp + - Tuple of (start, end): time range (inclusive) + - Array of floats: multiple MET timestamps + bins : numpy.ndarray, int, or None + Spin bin(s) to flag as bad. Can be: + - None: flag all spin bins (0-89) for the given MET(s) + - Single int: one spin bin + - Array of ints: multiple spin bins + cull : int + Cull reason code (non-zero). Different validation checks can use + different codes to identify the reason for culling: + - 1: Loose criterion + - etc. + + Notes + ----- + If a time/bin is already flagged with a different cull code, this method + will overwrite it with the new cull code. Consider implementing logic to + preserve or combine cull codes if needed. + + Examples + -------- + >>> # Flag all spin bins for MET=1000.5 as loose (cull=1) + >>> goodtimes.remove_times(met=1000.5, bins=None, cull=CullCode.LOOSE) + + >>> # Flag spin bins 0-10 for MET=1000.5 + >>> goodtimes.remove_times(met=1000.5, bins=np.arange(11), cull=CullCode.LOOSE) + + >>> # Flag time range around a repoint (240s before/after) + >>> repoint_time = 1000.0 + >>> goodtimes.remove_times( + ... met=(repoint_time - 240, repoint_time + 240), + ... cull=CullCode.LOOSE + ... ) + + >>> # Flag multiple specific METs, all bins + >>> goodtimes.remove_times( + ... met=np.array([1000.5, 1001.5]), bins=None, cull=CullCode.LOOSE + ... ) + """ + if cull == 0: + raise ValueError("Cull code must be non-zero. Use 0 only for good times.") + + # Handle bins parameter + if bins is None: + # Flag all spin bins (0-89) + bins_array = np.arange(90) + else: + # Convert to array for consistent handling + bins_array = np.atleast_1d(bins) + + # Validate bin indices + if np.any((bins_array < 0) | (bins_array >= 90)): + raise ValueError("Spin bins must be in range [0, 89]") + + met_values = self.coords["met"].values + + # Handle time range input (tuple of start, end) + if isinstance(met, tuple) and len(met) == 2: + met_start, met_end = met + # Find all MET indices within the range + in_range = (met_values >= met_start) & (met_values <= met_end) + met_indices = np.nonzero(in_range)[0] + else: + # Convert met to array for consistent handling + met_array = np.atleast_1d(met) + + # Find indices of largest MET that is <= each met_val (vectorized) + # searchsorted with side='right' gives first index where value would go + # Subtract 1 to get the largest value <= met_val + met_indices = np.searchsorted(met_values, met_array, side="right") - 1 + + # Check for invalid indices (< 0 or >= len(met_values)) + valid_mask = (met_indices >= 0) & (met_indices < len(met_values) - 1) + if len(met_indices) == 0 or not np.all(valid_mask): + invalid_mets = met_array[~valid_mask] if len(met_array) > 1 else met_array + raise ValueError( + f"MET value(s) outside valid range: {invalid_mets}. " + f"Valid range: [{met_values[0]}, {met_values[-1]}]" + ) + met_indices = met_indices[valid_mask] + + # Set cull_flags for all indices + n_times = len(met_indices) + n_bins = len(bins_array) + logger.debug( + f"Flagging {n_times} MET time(s) x {n_bins} spin bin(s) with " + f"cull code {cull}" + ) + self["cull_flags"].values[np.ix_(met_indices, bins_array)] = cull + + def get_good_intervals(self) -> np.ndarray: + """ + Extract good time intervals for each MET timestamp. + + Creates an interval for each MET time that has good bins. Since ESA step + changes at each MET, each MET gets its own interval(s). + + If good bins wrap around the 89->0 boundary (e.g., bins 88,89,0,1), multiple + intervals are created for the same MET time, one for each contiguous set. + + Returns + ------- + numpy.ndarray + Structured array with dtype INTERVAL_DTYPE containing: + - met_start: MET timestamp of interval + - met_end: MET timestamp of interval (same as met_start) + - spin_bin_low: Lowest good spin bin in interval + - spin_bin_high: Highest good spin bin in interval + - n_good_bins: Number of good bins + - esa_step: ESA energy step for this MET + + Notes + ----- + This is used for generating the Good Times output files per algorithm + document Section 2.3.2.5. + """ + logger.debug("Extracting good time intervals") + intervals: list[np.void] = [] + met_values = self.coords["met"].values + cull_flags = self["cull_flags"].values + esa_steps = self["esa_step"].values + + if len(met_values) == 0: + logger.warning("No MET values found, returning empty intervals array") + return np.array([], dtype=INTERVAL_DTYPE) + + # Process each MET time + for met_idx in range(len(met_values)): + self._add_intervals_for_pattern( + intervals, + met_values[met_idx], + met_values[met_idx], # met_start == met_end + cull_flags[met_idx, :], + esa_steps[met_idx], + ) + + logger.info(f"Extracted {len(intervals)} good time intervals") + return np.array(intervals, dtype=INTERVAL_DTYPE) + + def _add_intervals_for_pattern( + self, + intervals: list, + met_start: float, + met_end: float, + pattern: np.ndarray, + esa_step: int, + ) -> None: + """ + Add interval(s) for a cull_flags pattern, splitting if bins wrap around. + + Parameters + ---------- + intervals : list + List to append interval tuples to. + met_start : float + Start MET timestamp. + met_end : float + End MET timestamp. + pattern : numpy.ndarray + Cull flags pattern for spin bins. + esa_step : int + ESA energy step for this MET. + """ + good_bins = np.nonzero(pattern == 0)[0] + + if len(good_bins) == 0: + return + + # Check for gaps in good_bins (indicating separate contiguous regions) + # Bins are contiguous if difference between consecutive bins is 1 + gaps = np.nonzero(np.diff(good_bins) > 1)[0] + + if len(gaps) == 0: + # No gaps - single contiguous region + interval = ( + met_start, + met_end, + good_bins[0], + good_bins[-1], + len(good_bins), + esa_step, + ) + intervals.append(interval) + else: + # Multiple contiguous regions - split at gaps + start_idx = 0 + for gap_idx in gaps: + # Create interval for bins before the gap + bins_segment = good_bins[start_idx : gap_idx + 1] + interval = ( + met_start, + met_end, + bins_segment[0], + bins_segment[-1], + len(bins_segment), + esa_step, + ) + intervals.append(interval) + start_idx = gap_idx + 1 + + # Handle final segment after last gap + bins_segment = good_bins[start_idx:] + interval = ( + met_start, + met_end, + bins_segment[0], + bins_segment[-1], + len(bins_segment), + esa_step, + ) + intervals.append(interval) + + def get_cull_statistics(self) -> dict: + """ + Calculate statistics on cull codes for diagnostics. + + Returns + ------- + dict + Dictionary with cull code statistics: + - total_bins: Total number of MET × spin_bin combinations + - good_bins: Number of bins with cull_flags=0 + - culled_bins: Number of bins with cull_flags>0 + - fraction_good: Fraction of bins that are good + - cull_code_counts: Dict mapping cull codes to counts + """ + total_bins = self["cull_flags"].size + good_bins = int(np.sum(self["cull_flags"].values == 0)) + culled_bins = total_bins - good_bins + + # Count occurrences of each cull code + unique_codes, counts = np.unique( + self["cull_flags"].values[self["cull_flags"].values > 0], return_counts=True + ) + cull_code_counts = dict( + zip(unique_codes.tolist(), counts.tolist(), strict=False) + ) + + return { + "total_bins": int(total_bins), + "good_bins": int(good_bins), + "culled_bins": int(culled_bins), + "fraction_good": good_bins / total_bins if total_bins > 0 else 0.0, + "cull_code_counts": cull_code_counts, + } + + def to_txt(self, output_path: Path) -> Path: + """ + Write good times to text file in the format specified by algorithm document. + + Format per Section 2.3.2.5: + pointing MET_start MET_end spin_bin_low spin_bin_high sensor esa_step + [rate/sigma values...] + + Parameters + ---------- + output_path : pathlib.Path + Path where the text file should be written. + + Returns + ------- + pathlib.Path + Path to the created file. + """ + logger.info(f"Writing good times to file: {output_path}") + intervals = self.get_good_intervals() + + with open(output_path, "w") as f: + for interval in intervals: + pointing = self.attrs.get("pointing", 0) + sensor = self.attrs.get("sensor", "45sensor") + + # Format: + # pointing met_start met_end spin_bin_low spin_bin_high sensor esa_step + line = ( + f"{pointing:05d} " + f"{int(interval['met_start'])} " + f"{int(interval['met_end'])} " + f"{interval['spin_bin_low']} " + f"{interval['spin_bin_high']} " + f"{sensor} " + f"{interval['esa_step']}" + ) + + # TODO: Add rate/sigma values for each ESA step + + f.write(line + "\n") + + logger.info(f"Wrote {len(intervals)} intervals to {output_path}") + return output_path diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py new file mode 100644 index 000000000..52fbb96cd --- /dev/null +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -0,0 +1,515 @@ +"""Test coverage for imap_processing.hi.hi_goodtimes.py""" + +import numpy as np +import pytest +import xarray as xr + +from imap_processing.hi.hi_goodtimes import ( + INTERVAL_DTYPE, + CullCode, + Goodtimes, +) + + +@pytest.fixture +def mock_l1a_de(): + """Create a mock L1A Direct Event dataset for testing.""" + # Create 10 unique MET times, each appearing twice (paired) + # Plus 2 unpaired MET times + n_paired = 10 + + # Paired METs: each appears twice + paired_mets = np.arange(1000.0, 1000.0 + n_paired * 10, 10) + met_seconds = np.repeat(paired_mets.astype(int), 2) + met_subseconds = np.zeros(len(met_seconds)) + + # Add unpaired METs + unpaired_mets = np.array([2000.0, 3000.0]) + met_seconds = np.concatenate([met_seconds, unpaired_mets.astype(int)]) + met_subseconds = np.concatenate([met_subseconds, np.zeros(len(unpaired_mets))]) + + # ESA step cycles through values + esa_step = np.tile(np.arange(1, 11), len(met_seconds) // 10 + 1)[: len(met_seconds)] + + ds = xr.Dataset( + { + "meta_seconds": (["epoch"], met_seconds), + "meta_subseconds": (["epoch"], met_subseconds), + "esa_step": (["epoch"], esa_step.astype(np.uint8)), + }, + attrs={ + "Logical_source": "imap_hi_l1a_45sensor-de", + "Repointing": "repoint00042", + }, + ) + return ds + + +@pytest.fixture +def goodtimes_instance(mock_l1a_de): + """Create a Goodtimes instance for testing.""" + return Goodtimes.from_l1a_de(mock_l1a_de) + + +class TestCullCode: + """Test suite for CullCode IntEnum.""" + + def test_cull_code_values(self): + """Test CullCode enum values.""" + assert CullCode.GOOD == 0 + assert CullCode.LOOSE == 1 + + def test_cull_code_is_int(self): + """Test that CullCode values are integers.""" + assert isinstance(CullCode.GOOD, int) + assert isinstance(CullCode.LOOSE, int) + + +class TestGoodtimesFromL1aDe: + """Test suite for Goodtimes.from_l1a_de() classmethod.""" + + def test_from_l1a_de_basic(self, mock_l1a_de): + """Test basic creation from L1A DE data.""" + gt = Goodtimes.from_l1a_de(mock_l1a_de) + + assert isinstance(gt, Goodtimes) + assert isinstance(gt, xr.Dataset) + + def test_from_l1a_de_filters_unpaired_mets(self, mock_l1a_de): + """Test that unpaired METs are filtered out.""" + gt = Goodtimes.from_l1a_de(mock_l1a_de) + + # Should have 10 paired METs (20 total entries -> 10 unique paired) + assert len(gt.coords["met"]) == 10 + + def test_from_l1a_de_dimensions(self, goodtimes_instance): + """Test that dimensions are correct.""" + assert "met" in goodtimes_instance.dims + assert "spin_bin" in goodtimes_instance.dims + assert goodtimes_instance.dims["spin_bin"] == 90 + + def test_from_l1a_de_coordinates(self, goodtimes_instance): + """Test that coordinates are set correctly.""" + assert "met" in goodtimes_instance.coords + assert "spin_bin" in goodtimes_instance.coords + + # spin_bin should be 0-89 + np.testing.assert_array_equal( + goodtimes_instance.coords["spin_bin"].values, np.arange(90) + ) + + def test_from_l1a_de_data_variables(self, goodtimes_instance): + """Test that data variables are created.""" + assert "cull_flags" in goodtimes_instance.data_vars + assert "esa_step" in goodtimes_instance.data_vars + + def test_from_l1a_de_cull_flags_initialized_to_zero(self, goodtimes_instance): + """Test that cull_flags are initialized to 0 (good).""" + assert np.all(goodtimes_instance["cull_flags"].values == 0) + + def test_from_l1a_de_cull_flags_shape(self, goodtimes_instance): + """Test cull_flags array shape.""" + n_met = len(goodtimes_instance.coords["met"]) + assert goodtimes_instance["cull_flags"].shape == (n_met, 90) + + def test_from_l1a_de_esa_step_preserved(self, mock_l1a_de, goodtimes_instance): + """Test that ESA step values are preserved for paired METs.""" + # Get first occurrence of each paired MET + met_all = mock_l1a_de["meta_seconds"].values.astype(float) + unique_mets, first_indices, counts = np.unique( + met_all, return_index=True, return_counts=True + ) + paired_mask = counts == 2 + expected_esa_steps = mock_l1a_de["esa_step"].values[first_indices[paired_mask]] + + np.testing.assert_array_equal( + goodtimes_instance["esa_step"].values, expected_esa_steps + ) + + def test_from_l1a_de_attributes(self, goodtimes_instance): + """Test that attributes are set correctly.""" + assert goodtimes_instance.attrs["sensor"] == "Hi45" + assert goodtimes_instance.attrs["pointing"] == 42 + + +class TestRemoveTimes: + """Test suite for Goodtimes.remove_times() method.""" + + def test_remove_times_single_met_all_bins(self, goodtimes_instance): + """Test flagging a single MET with all bins.""" + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + + # Check that all bins for the first MET are flagged + assert np.all(goodtimes_instance["cull_flags"].values[0, :] == CullCode.LOOSE) + + # Check that other METs are still good + assert np.all(goodtimes_instance["cull_flags"].values[1:, :] == CullCode.GOOD) + + def test_remove_times_single_met_specific_bins(self, goodtimes_instance): + """Test flagging specific bins for a single MET.""" + met_val = goodtimes_instance.coords["met"].values[0] + bins_to_flag = np.array([0, 1, 2, 10]) + goodtimes_instance.remove_times( + met=met_val, bins=bins_to_flag, cull=CullCode.LOOSE + ) + + # Check that specified bins are flagged + assert np.all( + goodtimes_instance["cull_flags"].values[0, bins_to_flag] == CullCode.LOOSE + ) + + # Check that other bins are still good + other_bins = np.setdiff1d(np.arange(90), bins_to_flag) + assert np.all( + goodtimes_instance["cull_flags"].values[0, other_bins] == CullCode.GOOD + ) + + def test_remove_times_multiple_mets(self, goodtimes_instance): + """Test flagging multiple METs.""" + met_vals = goodtimes_instance.coords["met"].values[:3] + goodtimes_instance.remove_times(met=met_vals, bins=None, cull=CullCode.LOOSE) + + # Check that first 3 METs are flagged + assert np.all(goodtimes_instance["cull_flags"].values[:3, :] == CullCode.LOOSE) + + # Check that other METs are still good + assert np.all(goodtimes_instance["cull_flags"].values[3:, :] == CullCode.GOOD) + + def test_remove_times_time_range(self, goodtimes_instance): + """Test flagging a time range.""" + met_vals = goodtimes_instance.coords["met"].values + met_start = met_vals[2] + met_end = met_vals[5] + + goodtimes_instance.remove_times( + met=(met_start, met_end), bins=None, cull=CullCode.LOOSE + ) + + # Check that METs 2-5 are flagged + assert np.all(goodtimes_instance["cull_flags"].values[2:6, :] == CullCode.LOOSE) + + # Check that other METs are still good + assert np.all(goodtimes_instance["cull_flags"].values[:2, :] == CullCode.GOOD) + assert np.all(goodtimes_instance["cull_flags"].values[6:, :] == CullCode.GOOD) + + def test_remove_times_invalid_cull_code_zero(self, goodtimes_instance): + """Test that cull code 0 raises ValueError.""" + met_val = goodtimes_instance.coords["met"].values[0] + with pytest.raises(ValueError, match="Cull code must be non-zero"): + goodtimes_instance.remove_times(met=met_val, cull=0) + + def test_remove_times_invalid_bin_indices(self, goodtimes_instance): + """Test that invalid bin indices raise ValueError.""" + met_val = goodtimes_instance.coords["met"].values[0] + + # Test bin < 0 + with pytest.raises(ValueError, match="Spin bins must be in range"): + goodtimes_instance.remove_times(met=met_val, bins=np.array([-1, 0])) + + # Test bin >= 90 + with pytest.raises(ValueError, match="Spin bins must be in range"): + goodtimes_instance.remove_times(met=met_val, bins=np.array([89, 90])) + + def test_remove_times_met_out_of_range(self, goodtimes_instance): + """Test that MET outside valid range raises ValueError.""" + met_vals = goodtimes_instance.coords["met"].values + met_out_of_range = met_vals[-1] + 1000 + + with pytest.raises(ValueError, match="MET value\\(s\\) outside valid range"): + goodtimes_instance.remove_times(met=met_out_of_range) + + def test_remove_times_overwrites_existing_cull(self, goodtimes_instance): + """Test that new cull code overwrites existing one.""" + met_val = goodtimes_instance.coords["met"].values[0] + + # Flag with LOOSE + goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + assert np.all(goodtimes_instance["cull_flags"].values[0, :] == CullCode.LOOSE) + + # Overwrite with a different cull code + goodtimes_instance.remove_times(met=met_val, bins=None, cull=2) + assert np.all(goodtimes_instance["cull_flags"].values[0, :] == 2) + + +class TestGetGoodIntervals: + """Test suite for Goodtimes.get_good_intervals() method.""" + + def test_get_good_intervals_all_good(self, goodtimes_instance): + """Test getting intervals when all times are good.""" + intervals = goodtimes_instance.get_good_intervals() + + # Should have one interval per MET + n_met = len(goodtimes_instance.coords["met"]) + assert len(intervals) == n_met + + # Check interval structure + assert intervals.dtype == INTERVAL_DTYPE + + def test_get_good_intervals_structure(self, goodtimes_instance): + """Test interval structure and field names.""" + intervals = goodtimes_instance.get_good_intervals() + + # Check that all fields exist + assert "met_start" in intervals.dtype.names + assert "met_end" in intervals.dtype.names + assert "spin_bin_low" in intervals.dtype.names + assert "spin_bin_high" in intervals.dtype.names + assert "n_good_bins" in intervals.dtype.names + assert "esa_step" in intervals.dtype.names + + def test_get_good_intervals_all_good_values(self, goodtimes_instance): + """Test interval values when all bins are good.""" + intervals = goodtimes_instance.get_good_intervals() + + # When all bins are good, should have bins 0-89 + for interval in intervals: + assert interval["spin_bin_low"] == 0 + assert interval["spin_bin_high"] == 89 + assert interval["n_good_bins"] == 90 + assert interval["met_start"] == interval["met_end"] + + def test_get_good_intervals_with_culled_bins(self, goodtimes_instance): + """Test intervals when some bins are culled.""" + # Flag bins 0-20 for first MET + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times( + met=met_val, bins=np.arange(21), cull=CullCode.LOOSE + ) + + intervals = goodtimes_instance.get_good_intervals() + + # First interval should only have bins 21-89 + assert intervals[0]["spin_bin_low"] == 21 + assert intervals[0]["spin_bin_high"] == 89 + assert intervals[0]["n_good_bins"] == 69 + + def test_get_good_intervals_with_gaps(self, goodtimes_instance): + """Test intervals when good bins have gaps (wraparound).""" + # Flag bins 20-70 for first MET, leaving bins 0-19 and 71-89 as good + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times( + met=met_val, bins=np.arange(20, 71), cull=CullCode.LOOSE + ) + + intervals = goodtimes_instance.get_good_intervals() + + # Should create 2 intervals for the first MET (bins split by gap) + # Plus 9 more intervals for the remaining METs + assert len(intervals) == 11 + + # First two intervals should be for the same MET + assert intervals[0]["met_start"] == intervals[1]["met_start"] + + # Check the two segments + assert intervals[0]["spin_bin_low"] == 0 + assert intervals[0]["spin_bin_high"] == 19 + assert intervals[1]["spin_bin_low"] == 71 + assert intervals[1]["spin_bin_high"] == 89 + + def test_get_good_intervals_all_bins_culled(self, goodtimes_instance): + """Test intervals when all bins are culled for a MET.""" + # Flag all bins for first MET + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + + intervals = goodtimes_instance.get_good_intervals() + + # Should have 9 intervals (one per good MET, excluding the first) + assert len(intervals) == 9 + + # First interval should be for the second MET + assert intervals[0]["met_start"] == goodtimes_instance.coords["met"].values[1] + + def test_get_good_intervals_empty(self): + """Test intervals with empty Goodtimes object.""" + # Create empty dataset + gt = Goodtimes( + data_vars={ + "cull_flags": xr.DataArray( + np.zeros((0, 90), dtype=np.uint8), dims=["met", "spin_bin"] + ), + "esa_step": xr.DataArray(np.array([], dtype=np.uint8), dims=["met"]), + }, + coords={"met": np.array([]), "spin_bin": np.arange(90)}, + attrs={"sensor": "Hi45", "pointing": 0}, + ) + + intervals = gt.get_good_intervals() + assert len(intervals) == 0 + + def test_get_good_intervals_esa_step_included(self, goodtimes_instance): + """Test that ESA step is included in intervals.""" + intervals = goodtimes_instance.get_good_intervals() + + # Check that each interval has an ESA step + for i, interval in enumerate(intervals): + expected_esa_step = goodtimes_instance["esa_step"].values[i] + assert interval["esa_step"] == expected_esa_step + + +class TestGetCullStatistics: + """Test suite for Goodtimes.get_cull_statistics() method.""" + + def test_get_cull_statistics_all_good(self, goodtimes_instance): + """Test statistics when all bins are good.""" + stats = goodtimes_instance.get_cull_statistics() + + total_bins = len(goodtimes_instance.coords["met"]) * 90 + assert stats["total_bins"] == total_bins + assert stats["good_bins"] == total_bins + assert stats["culled_bins"] == 0 + assert stats["fraction_good"] == 1.0 + assert stats["cull_code_counts"] == {} + + def test_get_cull_statistics_with_culls(self, goodtimes_instance): + """Test statistics after culling some bins.""" + # Flag first MET, all bins + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + + stats = goodtimes_instance.get_cull_statistics() + + total_bins = len(goodtimes_instance.coords["met"]) * 90 + assert stats["total_bins"] == total_bins + assert stats["good_bins"] == total_bins - 90 + assert stats["culled_bins"] == 90 + assert stats["fraction_good"] == (total_bins - 90) / total_bins + assert stats["cull_code_counts"][CullCode.LOOSE] == 90 + + def test_get_cull_statistics_multiple_cull_codes(self, goodtimes_instance): + """Test statistics with multiple cull codes.""" + met_vals = goodtimes_instance.coords["met"].values + + # Flag first MET with LOOSE + goodtimes_instance.remove_times(met=met_vals[0], bins=None, cull=CullCode.LOOSE) + + # Flag second MET with code 2 + goodtimes_instance.remove_times(met=met_vals[1], bins=None, cull=2) + + stats = goodtimes_instance.get_cull_statistics() + + assert stats["culled_bins"] == 180 + assert stats["cull_code_counts"][CullCode.LOOSE] == 90 + assert stats["cull_code_counts"][2] == 90 + + +class TestToTxt: + """Test suite for Goodtimes.to_txt() method.""" + + def test_to_txt_creates_file(self, goodtimes_instance, tmp_path): + """Test that to_txt creates a file.""" + output_path = tmp_path / "goodtimes.txt" + result = goodtimes_instance.to_txt(output_path) + + assert result == output_path + assert output_path.exists() + + def test_to_txt_format(self, goodtimes_instance, tmp_path): + """Test the format of the output file.""" + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.to_txt(output_path) + + with open(output_path) as f: + lines = f.readlines() + + # Should have one line per interval (10 METs, all good) + assert len(lines) == 10 + + # Check format of first line + parts = lines[0].strip().split() + assert len(parts) == 7 + assert parts[0] == "00042" # pointing + assert parts[5] == "Hi45" # sensor + + def test_to_txt_values(self, goodtimes_instance, tmp_path): + """Test the values in the output file.""" + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.to_txt(output_path) + + with open(output_path) as f: + line = f.readline() + + parts = line.strip().split() + pointing, met_start, met_end, bin_low, bin_high, sensor, esa_step = parts + + assert pointing == "00042" + assert int(met_start) == int(goodtimes_instance.coords["met"].values[0]) + assert int(met_end) == int(goodtimes_instance.coords["met"].values[0]) + assert int(bin_low) == 0 + assert int(bin_high) == 89 + assert sensor == "Hi45" + assert int(esa_step) == goodtimes_instance["esa_step"].values[0] + + def test_to_txt_with_culled_bins(self, goodtimes_instance, tmp_path): + """Test output when some bins are culled.""" + # Flag bins 0-20 for first MET + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times( + met=met_val, bins=np.arange(21), cull=CullCode.LOOSE + ) + + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.to_txt(output_path) + + with open(output_path) as f: + first_line = f.readline() + + parts = first_line.strip().split() + bin_low = int(parts[3]) + bin_high = int(parts[4]) + + # First interval should only include bins 21-89 + assert bin_low == 21 + assert bin_high == 89 + + def test_to_txt_with_gaps(self, goodtimes_instance, tmp_path): + """Test output when bins have gaps.""" + # Flag bins 20-70, leaving 0-19 and 71-89 as good + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.remove_times( + met=met_val, bins=np.arange(20, 71), cull=CullCode.LOOSE + ) + + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.to_txt(output_path) + + with open(output_path) as f: + lines = f.readlines() + + # Should have 11 lines (2 for first MET, 1 for each of 9 remaining METs) + assert len(lines) == 11 + + # First two lines should be for same MET + parts1 = lines[0].strip().split() + parts2 = lines[1].strip().split() + assert parts1[1] == parts2[1] # Same met_start + + # Check bin ranges + assert int(parts1[3]) == 0 + assert int(parts1[4]) == 19 + assert int(parts2[3]) == 71 + assert int(parts2[4]) == 89 + + +class TestIntervalDtype: + """Test suite for INTERVAL_DTYPE.""" + + def test_interval_dtype_fields(self): + """Test that INTERVAL_DTYPE has correct fields.""" + field_names = INTERVAL_DTYPE.names + assert "met_start" in field_names + assert "met_end" in field_names + assert "spin_bin_low" in field_names + assert "spin_bin_high" in field_names + assert "n_good_bins" in field_names + assert "esa_step" in field_names + + def test_interval_dtype_types(self): + """Test that INTERVAL_DTYPE has correct field types.""" + assert INTERVAL_DTYPE["met_start"] == np.float64 + assert INTERVAL_DTYPE["met_end"] == np.float64 + assert INTERVAL_DTYPE["spin_bin_low"] == np.int32 + assert INTERVAL_DTYPE["spin_bin_high"] == np.int32 + assert INTERVAL_DTYPE["n_good_bins"] == np.int32 + assert INTERVAL_DTYPE["esa_step"] == np.uint8 From 7ff4f5af8cab2f4e4e079dc7733d33869430db03 Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Fri, 19 Dec 2025 11:12:39 -0700 Subject: [PATCH 2/3] Convert Goodtimes class to dataset accessor methods --- imap_processing/hi/hi_goodtimes.py | 228 ++++++++++-------- imap_processing/tests/hi/test_hi_goodtimes.py | 99 ++++---- 2 files changed, 180 insertions(+), 147 deletions(-) diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index 7c2f5a964..afe3f3c8f 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -4,7 +4,6 @@ import re from enum import IntEnum from pathlib import Path -from typing import Self import numpy as np import xarray as xr @@ -33,15 +32,106 @@ class CullCode(IntEnum): LOOSE = 1 -# mypy doesn't like subclassing Dataset -class Goodtimes(xr.Dataset): # type: ignore[misc] +def create_goodtimes_dataset(l1a_de: xr.Dataset) -> xr.Dataset: """ - IMAP-Hi Good Times data structure. + Create goodtimes dataset from L1A Direct Event data. + + Initializes all times and spin bins as good (cull_flags=0) for complete + 8-spin periods. Since we receive one packet every 4 spins but only record + MET every 8 spins, we expect MET values to appear in pairs. Only MET values + that appear as duplicates (pairs) are included, as single occurrences indicate + incomplete 8-spin periods. + + Parameters + ---------- + l1a_de : xarray.Dataset + L1A direct event data for this pointing. Used to extract MET timestamps + for each 8-spin interval. + + Returns + ------- + xarray.Dataset + Initialized goodtimes dataset with cull_flags set to 0 (all good) for + complete 8-spin periods only. Access goodtimes methods via the + .goodtimes accessor (e.g., dataset.goodtimes.remove_times()). + """ + logger.info("Creating Goodtimes from L1A Direct Event data") + + # Extract MET times from packet metadata + # Each MET represents one 8-spin histogram packet interval + # Format: seconds + subseconds/1000 + met_all = ( + l1a_de["meta_seconds"].astype(float) + + l1a_de["meta_subseconds"].astype(float) / 1000 + ) + logger.debug(f"Extracted {len(met_all)} total MET entries from L1A DE data") + + # Find unique MET values, their counts, and indices of first occurrences + unique_mets, first_indices, counts = np.unique( + met_all.values, return_index=True, return_counts=True + ) + logger.debug(f"Found {len(unique_mets)} unique MET values") + + # Keep only MET values that appear as pairs (count == 2) + paired_mask = counts == 2 + first_occurrence_indices = first_indices[paired_mask] + + n_paired = int(np.sum(paired_mask)) + n_unpaired = len(unique_mets) - n_paired + logger.info( + f"Filtered to {n_paired} complete 8-spin periods " + f"(excluded {n_unpaired} incomplete periods)" + ) + + # Extract data for paired METs only + met = met_all.isel(epoch=first_occurrence_indices) + esa_step = l1a_de["esa_step"].isel(epoch=first_occurrence_indices) + + # Create coordinates + coords = { + "met": met.values, + "spin_bin": np.arange(90), + } + + # Create data variables + # Initialize cull_flags - all good (0) by default + # Shape: (n_met_timestamps, 90 spin_bins) + # Per alg doc Section 2.2.4: 90-element arrays, one per histogram packet + data_vars = { + "cull_flags": xr.DataArray( + np.zeros((len(met), 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ), + "esa_step": esa_step, + } + + # Create attributes + sensor_number = parse_sensor_number(l1a_de.attrs["Logical_source"]) + match = re.match(r"repoint(?P\d{5})", l1a_de.attrs["Repointing"]) + if not match: + raise ValueError( + f"Unable to parse sensor number from l1a_de Repointing " + f"attribute: {l1a_de.attrs['Repointing']}" + ) + attrs = { + "sensor": f"Hi{sensor_number}", + "pointing": int(match["pointing_num"]), + } + + return xr.Dataset(data_vars, coords, attrs) + - Tracks good/bad time intervals for a single Pointing based on validation - checks defined in the IMAP-Hi Algorithm Document Section 2.2.4 and 2.3.2. +@xr.register_dataset_accessor("goodtimes") +class GoodtimesAccessor: + """ + Extend xarray.Dataset with accessor for IMAP-Hi Good Times operations. + + Provides methods to track and manage good/bad time intervals for a single + Pointing based on validation checks defined in the IMAP-Hi Algorithm + Document Section 2.2.4 and 2.3.2. - The data structure maintains a cull_flags array initialized to all zeros (good). + The accessor operates on xr.Dataset objects created by create_goodtimes_dataset(). + The dataset maintains a cull_flags array initialized to all zeros (good). As bad times are identified by validation algorithms, they are flagged via the `remove_times()` method with a non-zero cull code. @@ -49,7 +139,7 @@ class Goodtimes(xr.Dataset): # type: ignore[misc] * 0 : Good time (default) * 1-N : Bad time, with specific cull reason code - xarray.Dataset structure: + Expected xarray.Dataset structure: * Dimensions: * met : int Number of MET timestamps (one per 8-spin histogram packet, ~90 per pointing) @@ -70,95 +160,22 @@ class Goodtimes(xr.Dataset): # type: ignore[misc] Sensor identifier ('45sensor' or '90sensor') * pointing : int Pointing number for this dataset - """ - - @classmethod - def from_l1a_de(cls, l1a_de: xr.Dataset) -> Self: - """ - Create Goodtimes object from L1A Direct Event data. - - Initializes all times and spin bins as good (cull_flags=0) for complete - 8-spin periods. Since we receive one packet every 4 spins but only record - MET every 8 spins, we expect MET values to appear in pairs. Only MET values - that appear as duplicates (pairs) are included, as single occurrences indicate - incomplete 8-spin periods. - Parameters - ---------- - l1a_de : xarray.Dataset - L1A direct event data for this pointing. Used to extract MET timestamps - for each 8-spin interval. + Parameters + ---------- + xarray_obj : xarray.Dataset + The xarray Dataset to wrap with goodtimes accessor functionality. - Returns - ------- - Goodtimes - Initialized Goodtimes object with cull_flags set to 0 (all good) for - complete 8-spin periods only. - """ - logger.info("Creating Goodtimes from L1A Direct Event data") - - # Extract MET times from packet metadata - # Each MET represents one 8-spin histogram packet interval - # Format: seconds + subseconds/1000 - met_all = ( - l1a_de["meta_seconds"].astype(float) - + l1a_de["meta_subseconds"].astype(float) / 1000 - ) - logger.debug(f"Extracted {len(met_all)} total MET entries from L1A DE data") - - # Find unique MET values, their counts, and indices of first occurrences - unique_mets, first_indices, counts = np.unique( - met_all.values, return_index=True, return_counts=True - ) - logger.debug(f"Found {len(unique_mets)} unique MET values") - - # Keep only MET values that appear as pairs (count == 2) - paired_mask = counts == 2 - first_occurrence_indices = first_indices[paired_mask] - - n_paired = int(np.sum(paired_mask)) - n_unpaired = len(unique_mets) - n_paired - logger.info( - f"Filtered to {n_paired} complete 8-spin periods " - f"(excluded {n_unpaired} incomplete periods)" - ) - - # Extract data for paired METs only - met = met_all.isel(epoch=first_occurrence_indices) - esa_step = l1a_de["esa_step"].isel(epoch=first_occurrence_indices) - - # Create coordinates - coords = { - "met": met.values, - "spin_bin": np.arange(90), - } - - # Create data variables - # Initialize cull_flags - all good (0) by default - # Shape: (n_met_timestamps, 90 spin_bins) - # Per alg doc Section 2.2.4: 90-element arrays, one per histogram packet - data_vars = { - "cull_flags": xr.DataArray( - np.zeros((len(met), 90), dtype=np.uint8), - dims=["met", "spin_bin"], - ), - "esa_step": esa_step, - } - - # Create attributes - sensor_number = parse_sensor_number(l1a_de.attrs["Logical_source"]) - match = re.match(r"repoint(?P\d{5})", l1a_de.attrs["Repointing"]) - if not match: - raise ValueError( - f"Unable to parse sensor number from l1a_de Repointing " - f"attribute: {l1a_de.attrs['Repointing']}" - ) - attrs = { - "sensor": f"Hi{sensor_number}", - "pointing": int(match["pointing_num"]), - } + Examples + -------- + >>> gt_dataset = create_goodtimes_dataset(l1a_de) + >>> gt_dataset.goodtimes.remove_times(met=1000.5, cull=CullCode.LOOSE) + >>> intervals = gt_dataset.goodtimes.get_good_intervals() + """ - return cls(data_vars, coords, attrs) + def __init__(self, xarray_obj: xr.Dataset) -> None: + """Initialize the accessor with an xarray Dataset.""" + self._obj = xarray_obj def remove_times( self, @@ -232,7 +249,7 @@ def remove_times( if np.any((bins_array < 0) | (bins_array >= 90)): raise ValueError("Spin bins must be in range [0, 89]") - met_values = self.coords["met"].values + met_values = self._obj.coords["met"].values # Handle time range input (tuple of start, end) if isinstance(met, tuple) and len(met) == 2: @@ -266,7 +283,7 @@ def remove_times( f"Flagging {n_times} MET time(s) x {n_bins} spin bin(s) with " f"cull code {cull}" ) - self["cull_flags"].values[np.ix_(met_indices, bins_array)] = cull + self._obj["cull_flags"].values[np.ix_(met_indices, bins_array)] = cull def get_good_intervals(self) -> np.ndarray: """ @@ -296,9 +313,9 @@ def get_good_intervals(self) -> np.ndarray: """ logger.debug("Extracting good time intervals") intervals: list[np.void] = [] - met_values = self.coords["met"].values - cull_flags = self["cull_flags"].values - esa_steps = self["esa_step"].values + met_values = self._obj.coords["met"].values + cull_flags = self._obj["cull_flags"].values + esa_steps = self._obj["esa_step"].values if len(met_values) == 0: logger.warning("No MET values found, returning empty intervals array") @@ -404,13 +421,14 @@ def get_cull_statistics(self) -> dict: - fraction_good: Fraction of bins that are good - cull_code_counts: Dict mapping cull codes to counts """ - total_bins = self["cull_flags"].size - good_bins = int(np.sum(self["cull_flags"].values == 0)) + total_bins = self._obj["cull_flags"].size + good_bins = int(np.sum(self._obj["cull_flags"].values == 0)) culled_bins = total_bins - good_bins # Count occurrences of each cull code unique_codes, counts = np.unique( - self["cull_flags"].values[self["cull_flags"].values > 0], return_counts=True + self._obj["cull_flags"].values[self._obj["cull_flags"].values > 0], + return_counts=True, ) cull_code_counts = dict( zip(unique_codes.tolist(), counts.tolist(), strict=False) @@ -424,7 +442,7 @@ def get_cull_statistics(self) -> dict: "cull_code_counts": cull_code_counts, } - def to_txt(self, output_path: Path) -> Path: + def write_txt(self, output_path: Path) -> Path: """ Write good times to text file in the format specified by algorithm document. @@ -447,8 +465,8 @@ def to_txt(self, output_path: Path) -> Path: with open(output_path, "w") as f: for interval in intervals: - pointing = self.attrs.get("pointing", 0) - sensor = self.attrs.get("sensor", "45sensor") + pointing = self._obj.attrs.get("pointing", 0) + sensor = self._obj.attrs.get("sensor", "45sensor") # Format: # pointing met_start met_end spin_bin_low spin_bin_high sensor esa_step diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py index 52fbb96cd..8df4011b2 100644 --- a/imap_processing/tests/hi/test_hi_goodtimes.py +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -7,7 +7,7 @@ from imap_processing.hi.hi_goodtimes import ( INTERVAL_DTYPE, CullCode, - Goodtimes, + create_goodtimes_dataset, ) @@ -47,8 +47,8 @@ def mock_l1a_de(): @pytest.fixture def goodtimes_instance(mock_l1a_de): - """Create a Goodtimes instance for testing.""" - return Goodtimes.from_l1a_de(mock_l1a_de) + """Create a goodtimes dataset for testing.""" + return create_goodtimes_dataset(mock_l1a_de) class TestCullCode: @@ -70,14 +70,13 @@ class TestGoodtimesFromL1aDe: def test_from_l1a_de_basic(self, mock_l1a_de): """Test basic creation from L1A DE data.""" - gt = Goodtimes.from_l1a_de(mock_l1a_de) + gt = create_goodtimes_dataset(mock_l1a_de) - assert isinstance(gt, Goodtimes) assert isinstance(gt, xr.Dataset) def test_from_l1a_de_filters_unpaired_mets(self, mock_l1a_de): """Test that unpaired METs are filtered out.""" - gt = Goodtimes.from_l1a_de(mock_l1a_de) + gt = create_goodtimes_dataset(mock_l1a_de) # Should have 10 paired METs (20 total entries -> 10 unique paired) assert len(gt.coords["met"]) == 10 @@ -138,7 +137,9 @@ class TestRemoveTimes: def test_remove_times_single_met_all_bins(self, goodtimes_instance): """Test flagging a single MET with all bins.""" met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) # Check that all bins for the first MET are flagged assert np.all(goodtimes_instance["cull_flags"].values[0, :] == CullCode.LOOSE) @@ -150,7 +151,7 @@ def test_remove_times_single_met_specific_bins(self, goodtimes_instance): """Test flagging specific bins for a single MET.""" met_val = goodtimes_instance.coords["met"].values[0] bins_to_flag = np.array([0, 1, 2, 10]) - goodtimes_instance.remove_times( + goodtimes_instance.goodtimes.remove_times( met=met_val, bins=bins_to_flag, cull=CullCode.LOOSE ) @@ -168,7 +169,9 @@ def test_remove_times_single_met_specific_bins(self, goodtimes_instance): def test_remove_times_multiple_mets(self, goodtimes_instance): """Test flagging multiple METs.""" met_vals = goodtimes_instance.coords["met"].values[:3] - goodtimes_instance.remove_times(met=met_vals, bins=None, cull=CullCode.LOOSE) + goodtimes_instance.goodtimes.remove_times( + met=met_vals, bins=None, cull=CullCode.LOOSE + ) # Check that first 3 METs are flagged assert np.all(goodtimes_instance["cull_flags"].values[:3, :] == CullCode.LOOSE) @@ -182,7 +185,7 @@ def test_remove_times_time_range(self, goodtimes_instance): met_start = met_vals[2] met_end = met_vals[5] - goodtimes_instance.remove_times( + goodtimes_instance.goodtimes.remove_times( met=(met_start, met_end), bins=None, cull=CullCode.LOOSE ) @@ -197,7 +200,7 @@ def test_remove_times_invalid_cull_code_zero(self, goodtimes_instance): """Test that cull code 0 raises ValueError.""" met_val = goodtimes_instance.coords["met"].values[0] with pytest.raises(ValueError, match="Cull code must be non-zero"): - goodtimes_instance.remove_times(met=met_val, cull=0) + goodtimes_instance.goodtimes.remove_times(met=met_val, cull=0) def test_remove_times_invalid_bin_indices(self, goodtimes_instance): """Test that invalid bin indices raise ValueError.""" @@ -205,11 +208,15 @@ def test_remove_times_invalid_bin_indices(self, goodtimes_instance): # Test bin < 0 with pytest.raises(ValueError, match="Spin bins must be in range"): - goodtimes_instance.remove_times(met=met_val, bins=np.array([-1, 0])) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.array([-1, 0]) + ) # Test bin >= 90 with pytest.raises(ValueError, match="Spin bins must be in range"): - goodtimes_instance.remove_times(met=met_val, bins=np.array([89, 90])) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.array([89, 90]) + ) def test_remove_times_met_out_of_range(self, goodtimes_instance): """Test that MET outside valid range raises ValueError.""" @@ -217,18 +224,20 @@ def test_remove_times_met_out_of_range(self, goodtimes_instance): met_out_of_range = met_vals[-1] + 1000 with pytest.raises(ValueError, match="MET value\\(s\\) outside valid range"): - goodtimes_instance.remove_times(met=met_out_of_range) + goodtimes_instance.goodtimes.remove_times(met=met_out_of_range) def test_remove_times_overwrites_existing_cull(self, goodtimes_instance): """Test that new cull code overwrites existing one.""" met_val = goodtimes_instance.coords["met"].values[0] # Flag with LOOSE - goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) assert np.all(goodtimes_instance["cull_flags"].values[0, :] == CullCode.LOOSE) # Overwrite with a different cull code - goodtimes_instance.remove_times(met=met_val, bins=None, cull=2) + goodtimes_instance.goodtimes.remove_times(met=met_val, bins=None, cull=2) assert np.all(goodtimes_instance["cull_flags"].values[0, :] == 2) @@ -237,7 +246,7 @@ class TestGetGoodIntervals: def test_get_good_intervals_all_good(self, goodtimes_instance): """Test getting intervals when all times are good.""" - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # Should have one interval per MET n_met = len(goodtimes_instance.coords["met"]) @@ -248,7 +257,7 @@ def test_get_good_intervals_all_good(self, goodtimes_instance): def test_get_good_intervals_structure(self, goodtimes_instance): """Test interval structure and field names.""" - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # Check that all fields exist assert "met_start" in intervals.dtype.names @@ -260,7 +269,7 @@ def test_get_good_intervals_structure(self, goodtimes_instance): def test_get_good_intervals_all_good_values(self, goodtimes_instance): """Test interval values when all bins are good.""" - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # When all bins are good, should have bins 0-89 for interval in intervals: @@ -273,11 +282,11 @@ def test_get_good_intervals_with_culled_bins(self, goodtimes_instance): """Test intervals when some bins are culled.""" # Flag bins 0-20 for first MET met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times( + goodtimes_instance.goodtimes.remove_times( met=met_val, bins=np.arange(21), cull=CullCode.LOOSE ) - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # First interval should only have bins 21-89 assert intervals[0]["spin_bin_low"] == 21 @@ -288,11 +297,11 @@ def test_get_good_intervals_with_gaps(self, goodtimes_instance): """Test intervals when good bins have gaps (wraparound).""" # Flag bins 20-70 for first MET, leaving bins 0-19 and 71-89 as good met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times( + goodtimes_instance.goodtimes.remove_times( met=met_val, bins=np.arange(20, 71), cull=CullCode.LOOSE ) - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # Should create 2 intervals for the first MET (bins split by gap) # Plus 9 more intervals for the remaining METs @@ -311,9 +320,11 @@ def test_get_good_intervals_all_bins_culled(self, goodtimes_instance): """Test intervals when all bins are culled for a MET.""" # Flag all bins for first MET met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # Should have 9 intervals (one per good MET, excluding the first) assert len(intervals) == 9 @@ -322,9 +333,9 @@ def test_get_good_intervals_all_bins_culled(self, goodtimes_instance): assert intervals[0]["met_start"] == goodtimes_instance.coords["met"].values[1] def test_get_good_intervals_empty(self): - """Test intervals with empty Goodtimes object.""" + """Test intervals with empty goodtimes dataset.""" # Create empty dataset - gt = Goodtimes( + gt = xr.Dataset( data_vars={ "cull_flags": xr.DataArray( np.zeros((0, 90), dtype=np.uint8), dims=["met", "spin_bin"] @@ -335,12 +346,12 @@ def test_get_good_intervals_empty(self): attrs={"sensor": "Hi45", "pointing": 0}, ) - intervals = gt.get_good_intervals() + intervals = gt.goodtimes.get_good_intervals() assert len(intervals) == 0 def test_get_good_intervals_esa_step_included(self, goodtimes_instance): """Test that ESA step is included in intervals.""" - intervals = goodtimes_instance.get_good_intervals() + intervals = goodtimes_instance.goodtimes.get_good_intervals() # Check that each interval has an ESA step for i, interval in enumerate(intervals): @@ -353,7 +364,7 @@ class TestGetCullStatistics: def test_get_cull_statistics_all_good(self, goodtimes_instance): """Test statistics when all bins are good.""" - stats = goodtimes_instance.get_cull_statistics() + stats = goodtimes_instance.goodtimes.get_cull_statistics() total_bins = len(goodtimes_instance.coords["met"]) * 90 assert stats["total_bins"] == total_bins @@ -366,9 +377,11 @@ def test_get_cull_statistics_with_culls(self, goodtimes_instance): """Test statistics after culling some bins.""" # Flag first MET, all bins met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times(met=met_val, bins=None, cull=CullCode.LOOSE) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) - stats = goodtimes_instance.get_cull_statistics() + stats = goodtimes_instance.goodtimes.get_cull_statistics() total_bins = len(goodtimes_instance.coords["met"]) * 90 assert stats["total_bins"] == total_bins @@ -382,12 +395,14 @@ def test_get_cull_statistics_multiple_cull_codes(self, goodtimes_instance): met_vals = goodtimes_instance.coords["met"].values # Flag first MET with LOOSE - goodtimes_instance.remove_times(met=met_vals[0], bins=None, cull=CullCode.LOOSE) + goodtimes_instance.goodtimes.remove_times( + met=met_vals[0], bins=None, cull=CullCode.LOOSE + ) # Flag second MET with code 2 - goodtimes_instance.remove_times(met=met_vals[1], bins=None, cull=2) + goodtimes_instance.goodtimes.remove_times(met=met_vals[1], bins=None, cull=2) - stats = goodtimes_instance.get_cull_statistics() + stats = goodtimes_instance.goodtimes.get_cull_statistics() assert stats["culled_bins"] == 180 assert stats["cull_code_counts"][CullCode.LOOSE] == 90 @@ -400,7 +415,7 @@ class TestToTxt: def test_to_txt_creates_file(self, goodtimes_instance, tmp_path): """Test that to_txt creates a file.""" output_path = tmp_path / "goodtimes.txt" - result = goodtimes_instance.to_txt(output_path) + result = goodtimes_instance.goodtimes.write_txt(output_path) assert result == output_path assert output_path.exists() @@ -408,7 +423,7 @@ def test_to_txt_creates_file(self, goodtimes_instance, tmp_path): def test_to_txt_format(self, goodtimes_instance, tmp_path): """Test the format of the output file.""" output_path = tmp_path / "goodtimes.txt" - goodtimes_instance.to_txt(output_path) + goodtimes_instance.goodtimes.write_txt(output_path) with open(output_path) as f: lines = f.readlines() @@ -425,7 +440,7 @@ def test_to_txt_format(self, goodtimes_instance, tmp_path): def test_to_txt_values(self, goodtimes_instance, tmp_path): """Test the values in the output file.""" output_path = tmp_path / "goodtimes.txt" - goodtimes_instance.to_txt(output_path) + goodtimes_instance.goodtimes.write_txt(output_path) with open(output_path) as f: line = f.readline() @@ -445,12 +460,12 @@ def test_to_txt_with_culled_bins(self, goodtimes_instance, tmp_path): """Test output when some bins are culled.""" # Flag bins 0-20 for first MET met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times( + goodtimes_instance.goodtimes.remove_times( met=met_val, bins=np.arange(21), cull=CullCode.LOOSE ) output_path = tmp_path / "goodtimes.txt" - goodtimes_instance.to_txt(output_path) + goodtimes_instance.goodtimes.write_txt(output_path) with open(output_path) as f: first_line = f.readline() @@ -467,12 +482,12 @@ def test_to_txt_with_gaps(self, goodtimes_instance, tmp_path): """Test output when bins have gaps.""" # Flag bins 20-70, leaving 0-19 and 71-89 as good met_val = goodtimes_instance.coords["met"].values[0] - goodtimes_instance.remove_times( + goodtimes_instance.goodtimes.remove_times( met=met_val, bins=np.arange(20, 71), cull=CullCode.LOOSE ) output_path = tmp_path / "goodtimes.txt" - goodtimes_instance.to_txt(output_path) + goodtimes_instance.goodtimes.write_txt(output_path) with open(output_path) as f: lines = f.readlines() From 31920d1669d80b56edd2d4d3d744484156e1480d Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Fri, 19 Dec 2025 11:46:28 -0700 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- imap_processing/hi/hi_goodtimes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index afe3f3c8f..4b21c7861 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -110,7 +110,7 @@ def create_goodtimes_dataset(l1a_de: xr.Dataset) -> xr.Dataset: match = re.match(r"repoint(?P\d{5})", l1a_de.attrs["Repointing"]) if not match: raise ValueError( - f"Unable to parse sensor number from l1a_de Repointing " + f"Unable to parse pointing number from l1a_de Repointing " f"attribute: {l1a_de.attrs['Repointing']}" ) attrs = { @@ -157,7 +157,7 @@ class GoodtimesAccessor: ESA energy step for each MET timestamp * Attributes * sensor : str - Sensor identifier ('45sensor' or '90sensor') + Sensor identifier ('Hi45' or 'Hi90') * pointing : int Pointing number for this dataset