diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py new file mode 100644 index 000000000..4b21c7861 --- /dev/null +++ b/imap_processing/hi/hi_goodtimes.py @@ -0,0 +1,488 @@ +"""IMAP-HI Goodtimes processing module.""" + +import logging +import re +from enum import IntEnum +from pathlib import Path + +import numpy as np +import xarray as xr + +from imap_processing.hi.utils import parse_sensor_number + +logger = logging.getLogger(__name__) + +# Structured dtype for good time intervals +INTERVAL_DTYPE = np.dtype( + [ + ("met_start", np.float64), + ("met_end", np.float64), + ("spin_bin_low", np.int32), + ("spin_bin_high", np.int32), + ("n_good_bins", np.int32), + ("esa_step", np.uint8), + ] +) + + +class CullCode(IntEnum): + """Cull reason codes for good/bad time classification.""" + + GOOD = 0 + LOOSE = 1 + + +def create_goodtimes_dataset(l1a_de: xr.Dataset) -> xr.Dataset: + """ + Create goodtimes dataset from L1A Direct Event data. + + Initializes all times and spin bins as good (cull_flags=0) for complete + 8-spin periods. Since we receive one packet every 4 spins but only record + MET every 8 spins, we expect MET values to appear in pairs. Only MET values + that appear as duplicates (pairs) are included, as single occurrences indicate + incomplete 8-spin periods. + + Parameters + ---------- + l1a_de : xarray.Dataset + L1A direct event data for this pointing. Used to extract MET timestamps + for each 8-spin interval. + + Returns + ------- + xarray.Dataset + Initialized goodtimes dataset with cull_flags set to 0 (all good) for + complete 8-spin periods only. Access goodtimes methods via the + .goodtimes accessor (e.g., dataset.goodtimes.remove_times()). + """ + logger.info("Creating Goodtimes from L1A Direct Event data") + + # Extract MET times from packet metadata + # Each MET represents one 8-spin histogram packet interval + # Format: seconds + subseconds/1000 + met_all = ( + l1a_de["meta_seconds"].astype(float) + + l1a_de["meta_subseconds"].astype(float) / 1000 + ) + logger.debug(f"Extracted {len(met_all)} total MET entries from L1A DE data") + + # Find unique MET values, their counts, and indices of first occurrences + unique_mets, first_indices, counts = np.unique( + met_all.values, return_index=True, return_counts=True + ) + logger.debug(f"Found {len(unique_mets)} unique MET values") + + # Keep only MET values that appear as pairs (count == 2) + paired_mask = counts == 2 + first_occurrence_indices = first_indices[paired_mask] + + n_paired = int(np.sum(paired_mask)) + n_unpaired = len(unique_mets) - n_paired + logger.info( + f"Filtered to {n_paired} complete 8-spin periods " + f"(excluded {n_unpaired} incomplete periods)" + ) + + # Extract data for paired METs only + met = met_all.isel(epoch=first_occurrence_indices) + esa_step = l1a_de["esa_step"].isel(epoch=first_occurrence_indices) + + # Create coordinates + coords = { + "met": met.values, + "spin_bin": np.arange(90), + } + + # Create data variables + # Initialize cull_flags - all good (0) by default + # Shape: (n_met_timestamps, 90 spin_bins) + # Per alg doc Section 2.2.4: 90-element arrays, one per histogram packet + data_vars = { + "cull_flags": xr.DataArray( + np.zeros((len(met), 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ), + "esa_step": esa_step, + } + + # Create attributes + sensor_number = parse_sensor_number(l1a_de.attrs["Logical_source"]) + match = re.match(r"repoint(?P\d{5})", l1a_de.attrs["Repointing"]) + if not match: + raise ValueError( + f"Unable to parse pointing number from l1a_de Repointing " + f"attribute: {l1a_de.attrs['Repointing']}" + ) + attrs = { + "sensor": f"Hi{sensor_number}", + "pointing": int(match["pointing_num"]), + } + + return xr.Dataset(data_vars, coords, attrs) + + +@xr.register_dataset_accessor("goodtimes") +class GoodtimesAccessor: + """ + Extend xarray.Dataset with accessor for IMAP-Hi Good Times operations. + + Provides methods to track and manage good/bad time intervals for a single + Pointing based on validation checks defined in the IMAP-Hi Algorithm + Document Section 2.2.4 and 2.3.2. + + The accessor operates on xr.Dataset objects created by create_goodtimes_dataset(). + The dataset maintains a cull_flags array initialized to all zeros (good). + As bad times are identified by validation algorithms, they are flagged via + the `remove_times()` method with a non-zero cull code. + + Cull Codes: + * 0 : Good time (default) + * 1-N : Bad time, with specific cull reason code + + Expected xarray.Dataset structure: + * Dimensions: + * met : int + Number of MET timestamps (one per 8-spin histogram packet, ~90 per pointing) + * spin_bin : int + Number of spin angle bins (90 bins covering 0-360 degrees) + * Coordinates + * met : numpy.ndarray + Mission Elapsed Time values for each 8-spin interval + * spin_bin : numpy.ndarray + Spin bin indices (0-89) + * Data Variables + * cull_flags : xarray.DataArray (met, spin_bin) + Cull flags where 0=good time, non-zero=bad time with cull reason code + * esa_step : xarray.DataArray (met,) + ESA energy step for each MET timestamp + * Attributes + * sensor : str + Sensor identifier ('Hi45' or 'Hi90') + * pointing : int + Pointing number for this dataset + + Parameters + ---------- + xarray_obj : xarray.Dataset + The xarray Dataset to wrap with goodtimes accessor functionality. + + Examples + -------- + >>> gt_dataset = create_goodtimes_dataset(l1a_de) + >>> gt_dataset.goodtimes.remove_times(met=1000.5, cull=CullCode.LOOSE) + >>> intervals = gt_dataset.goodtimes.get_good_intervals() + """ + + def __init__(self, xarray_obj: xr.Dataset) -> None: + """Initialize the accessor with an xarray Dataset.""" + self._obj = xarray_obj + + def remove_times( + self, + met: np.ndarray | float | tuple[float, float], + bins: np.ndarray | int | None = None, + cull: int = 1, + ) -> None: + """ + Flag specific MET times and spin bins as bad times with a cull code. + + This method is called by external validation algorithms when bad times + are identified. It sets the cull_flags to the specified non-zero cull code + for the given MET timestamps and spin bins. + + Parameters + ---------- + met : numpy.ndarray, float, or tuple of (float, float) + MET timestamp(s) to flag as bad. Can be: + - Single float: one MET timestamp + - Tuple of (start, end): time range (inclusive) + - Array of floats: multiple MET timestamps + bins : numpy.ndarray, int, or None + Spin bin(s) to flag as bad. Can be: + - None: flag all spin bins (0-89) for the given MET(s) + - Single int: one spin bin + - Array of ints: multiple spin bins + cull : int + Cull reason code (non-zero). Different validation checks can use + different codes to identify the reason for culling: + - 1: Loose criterion + - etc. + + Notes + ----- + If a time/bin is already flagged with a different cull code, this method + will overwrite it with the new cull code. Consider implementing logic to + preserve or combine cull codes if needed. + + Examples + -------- + >>> # Flag all spin bins for MET=1000.5 as loose (cull=1) + >>> goodtimes.remove_times(met=1000.5, bins=None, cull=CullCode.LOOSE) + + >>> # Flag spin bins 0-10 for MET=1000.5 + >>> goodtimes.remove_times(met=1000.5, bins=np.arange(11), cull=CullCode.LOOSE) + + >>> # Flag time range around a repoint (240s before/after) + >>> repoint_time = 1000.0 + >>> goodtimes.remove_times( + ... met=(repoint_time - 240, repoint_time + 240), + ... cull=CullCode.LOOSE + ... ) + + >>> # Flag multiple specific METs, all bins + >>> goodtimes.remove_times( + ... met=np.array([1000.5, 1001.5]), bins=None, cull=CullCode.LOOSE + ... ) + """ + if cull == 0: + raise ValueError("Cull code must be non-zero. Use 0 only for good times.") + + # Handle bins parameter + if bins is None: + # Flag all spin bins (0-89) + bins_array = np.arange(90) + else: + # Convert to array for consistent handling + bins_array = np.atleast_1d(bins) + + # Validate bin indices + if np.any((bins_array < 0) | (bins_array >= 90)): + raise ValueError("Spin bins must be in range [0, 89]") + + met_values = self._obj.coords["met"].values + + # Handle time range input (tuple of start, end) + if isinstance(met, tuple) and len(met) == 2: + met_start, met_end = met + # Find all MET indices within the range + in_range = (met_values >= met_start) & (met_values <= met_end) + met_indices = np.nonzero(in_range)[0] + else: + # Convert met to array for consistent handling + met_array = np.atleast_1d(met) + + # Find indices of largest MET that is <= each met_val (vectorized) + # searchsorted with side='right' gives first index where value would go + # Subtract 1 to get the largest value <= met_val + met_indices = np.searchsorted(met_values, met_array, side="right") - 1 + + # Check for invalid indices (< 0 or >= len(met_values)) + valid_mask = (met_indices >= 0) & (met_indices < len(met_values) - 1) + if len(met_indices) == 0 or not np.all(valid_mask): + invalid_mets = met_array[~valid_mask] if len(met_array) > 1 else met_array + raise ValueError( + f"MET value(s) outside valid range: {invalid_mets}. " + f"Valid range: [{met_values[0]}, {met_values[-1]}]" + ) + met_indices = met_indices[valid_mask] + + # Set cull_flags for all indices + n_times = len(met_indices) + n_bins = len(bins_array) + logger.debug( + f"Flagging {n_times} MET time(s) x {n_bins} spin bin(s) with " + f"cull code {cull}" + ) + self._obj["cull_flags"].values[np.ix_(met_indices, bins_array)] = cull + + def get_good_intervals(self) -> np.ndarray: + """ + Extract good time intervals for each MET timestamp. + + Creates an interval for each MET time that has good bins. Since ESA step + changes at each MET, each MET gets its own interval(s). + + If good bins wrap around the 89->0 boundary (e.g., bins 88,89,0,1), multiple + intervals are created for the same MET time, one for each contiguous set. + + Returns + ------- + numpy.ndarray + Structured array with dtype INTERVAL_DTYPE containing: + - met_start: MET timestamp of interval + - met_end: MET timestamp of interval (same as met_start) + - spin_bin_low: Lowest good spin bin in interval + - spin_bin_high: Highest good spin bin in interval + - n_good_bins: Number of good bins + - esa_step: ESA energy step for this MET + + Notes + ----- + This is used for generating the Good Times output files per algorithm + document Section 2.3.2.5. + """ + logger.debug("Extracting good time intervals") + intervals: list[np.void] = [] + met_values = self._obj.coords["met"].values + cull_flags = self._obj["cull_flags"].values + esa_steps = self._obj["esa_step"].values + + if len(met_values) == 0: + logger.warning("No MET values found, returning empty intervals array") + return np.array([], dtype=INTERVAL_DTYPE) + + # Process each MET time + for met_idx in range(len(met_values)): + self._add_intervals_for_pattern( + intervals, + met_values[met_idx], + met_values[met_idx], # met_start == met_end + cull_flags[met_idx, :], + esa_steps[met_idx], + ) + + logger.info(f"Extracted {len(intervals)} good time intervals") + return np.array(intervals, dtype=INTERVAL_DTYPE) + + def _add_intervals_for_pattern( + self, + intervals: list, + met_start: float, + met_end: float, + pattern: np.ndarray, + esa_step: int, + ) -> None: + """ + Add interval(s) for a cull_flags pattern, splitting if bins wrap around. + + Parameters + ---------- + intervals : list + List to append interval tuples to. + met_start : float + Start MET timestamp. + met_end : float + End MET timestamp. + pattern : numpy.ndarray + Cull flags pattern for spin bins. + esa_step : int + ESA energy step for this MET. + """ + good_bins = np.nonzero(pattern == 0)[0] + + if len(good_bins) == 0: + return + + # Check for gaps in good_bins (indicating separate contiguous regions) + # Bins are contiguous if difference between consecutive bins is 1 + gaps = np.nonzero(np.diff(good_bins) > 1)[0] + + if len(gaps) == 0: + # No gaps - single contiguous region + interval = ( + met_start, + met_end, + good_bins[0], + good_bins[-1], + len(good_bins), + esa_step, + ) + intervals.append(interval) + else: + # Multiple contiguous regions - split at gaps + start_idx = 0 + for gap_idx in gaps: + # Create interval for bins before the gap + bins_segment = good_bins[start_idx : gap_idx + 1] + interval = ( + met_start, + met_end, + bins_segment[0], + bins_segment[-1], + len(bins_segment), + esa_step, + ) + intervals.append(interval) + start_idx = gap_idx + 1 + + # Handle final segment after last gap + bins_segment = good_bins[start_idx:] + interval = ( + met_start, + met_end, + bins_segment[0], + bins_segment[-1], + len(bins_segment), + esa_step, + ) + intervals.append(interval) + + def get_cull_statistics(self) -> dict: + """ + Calculate statistics on cull codes for diagnostics. + + Returns + ------- + dict + Dictionary with cull code statistics: + - total_bins: Total number of MET × spin_bin combinations + - good_bins: Number of bins with cull_flags=0 + - culled_bins: Number of bins with cull_flags>0 + - fraction_good: Fraction of bins that are good + - cull_code_counts: Dict mapping cull codes to counts + """ + total_bins = self._obj["cull_flags"].size + good_bins = int(np.sum(self._obj["cull_flags"].values == 0)) + culled_bins = total_bins - good_bins + + # Count occurrences of each cull code + unique_codes, counts = np.unique( + self._obj["cull_flags"].values[self._obj["cull_flags"].values > 0], + return_counts=True, + ) + cull_code_counts = dict( + zip(unique_codes.tolist(), counts.tolist(), strict=False) + ) + + return { + "total_bins": int(total_bins), + "good_bins": int(good_bins), + "culled_bins": int(culled_bins), + "fraction_good": good_bins / total_bins if total_bins > 0 else 0.0, + "cull_code_counts": cull_code_counts, + } + + def write_txt(self, output_path: Path) -> Path: + """ + Write good times to text file in the format specified by algorithm document. + + Format per Section 2.3.2.5: + pointing MET_start MET_end spin_bin_low spin_bin_high sensor esa_step + [rate/sigma values...] + + Parameters + ---------- + output_path : pathlib.Path + Path where the text file should be written. + + Returns + ------- + pathlib.Path + Path to the created file. + """ + logger.info(f"Writing good times to file: {output_path}") + intervals = self.get_good_intervals() + + with open(output_path, "w") as f: + for interval in intervals: + pointing = self._obj.attrs.get("pointing", 0) + sensor = self._obj.attrs.get("sensor", "45sensor") + + # Format: + # pointing met_start met_end spin_bin_low spin_bin_high sensor esa_step + line = ( + f"{pointing:05d} " + f"{int(interval['met_start'])} " + f"{int(interval['met_end'])} " + f"{interval['spin_bin_low']} " + f"{interval['spin_bin_high']} " + f"{sensor} " + f"{interval['esa_step']}" + ) + + # TODO: Add rate/sigma values for each ESA step + + f.write(line + "\n") + + logger.info(f"Wrote {len(intervals)} intervals to {output_path}") + return output_path diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py new file mode 100644 index 000000000..8df4011b2 --- /dev/null +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -0,0 +1,530 @@ +"""Test coverage for imap_processing.hi.hi_goodtimes.py""" + +import numpy as np +import pytest +import xarray as xr + +from imap_processing.hi.hi_goodtimes import ( + INTERVAL_DTYPE, + CullCode, + create_goodtimes_dataset, +) + + +@pytest.fixture +def mock_l1a_de(): + """Create a mock L1A Direct Event dataset for testing.""" + # Create 10 unique MET times, each appearing twice (paired) + # Plus 2 unpaired MET times + n_paired = 10 + + # Paired METs: each appears twice + paired_mets = np.arange(1000.0, 1000.0 + n_paired * 10, 10) + met_seconds = np.repeat(paired_mets.astype(int), 2) + met_subseconds = np.zeros(len(met_seconds)) + + # Add unpaired METs + unpaired_mets = np.array([2000.0, 3000.0]) + met_seconds = np.concatenate([met_seconds, unpaired_mets.astype(int)]) + met_subseconds = np.concatenate([met_subseconds, np.zeros(len(unpaired_mets))]) + + # ESA step cycles through values + esa_step = np.tile(np.arange(1, 11), len(met_seconds) // 10 + 1)[: len(met_seconds)] + + ds = xr.Dataset( + { + "meta_seconds": (["epoch"], met_seconds), + "meta_subseconds": (["epoch"], met_subseconds), + "esa_step": (["epoch"], esa_step.astype(np.uint8)), + }, + attrs={ + "Logical_source": "imap_hi_l1a_45sensor-de", + "Repointing": "repoint00042", + }, + ) + return ds + + +@pytest.fixture +def goodtimes_instance(mock_l1a_de): + """Create a goodtimes dataset for testing.""" + return create_goodtimes_dataset(mock_l1a_de) + + +class TestCullCode: + """Test suite for CullCode IntEnum.""" + + def test_cull_code_values(self): + """Test CullCode enum values.""" + assert CullCode.GOOD == 0 + assert CullCode.LOOSE == 1 + + def test_cull_code_is_int(self): + """Test that CullCode values are integers.""" + assert isinstance(CullCode.GOOD, int) + assert isinstance(CullCode.LOOSE, int) + + +class TestGoodtimesFromL1aDe: + """Test suite for Goodtimes.from_l1a_de() classmethod.""" + + def test_from_l1a_de_basic(self, mock_l1a_de): + """Test basic creation from L1A DE data.""" + gt = create_goodtimes_dataset(mock_l1a_de) + + assert isinstance(gt, xr.Dataset) + + def test_from_l1a_de_filters_unpaired_mets(self, mock_l1a_de): + """Test that unpaired METs are filtered out.""" + gt = create_goodtimes_dataset(mock_l1a_de) + + # Should have 10 paired METs (20 total entries -> 10 unique paired) + assert len(gt.coords["met"]) == 10 + + def test_from_l1a_de_dimensions(self, goodtimes_instance): + """Test that dimensions are correct.""" + assert "met" in goodtimes_instance.dims + assert "spin_bin" in goodtimes_instance.dims + assert goodtimes_instance.dims["spin_bin"] == 90 + + def test_from_l1a_de_coordinates(self, goodtimes_instance): + """Test that coordinates are set correctly.""" + assert "met" in goodtimes_instance.coords + assert "spin_bin" in goodtimes_instance.coords + + # spin_bin should be 0-89 + np.testing.assert_array_equal( + goodtimes_instance.coords["spin_bin"].values, np.arange(90) + ) + + def test_from_l1a_de_data_variables(self, goodtimes_instance): + """Test that data variables are created.""" + assert "cull_flags" in goodtimes_instance.data_vars + assert "esa_step" in goodtimes_instance.data_vars + + def test_from_l1a_de_cull_flags_initialized_to_zero(self, goodtimes_instance): + """Test that cull_flags are initialized to 0 (good).""" + assert np.all(goodtimes_instance["cull_flags"].values == 0) + + def test_from_l1a_de_cull_flags_shape(self, goodtimes_instance): + """Test cull_flags array shape.""" + n_met = len(goodtimes_instance.coords["met"]) + assert goodtimes_instance["cull_flags"].shape == (n_met, 90) + + def test_from_l1a_de_esa_step_preserved(self, mock_l1a_de, goodtimes_instance): + """Test that ESA step values are preserved for paired METs.""" + # Get first occurrence of each paired MET + met_all = mock_l1a_de["meta_seconds"].values.astype(float) + unique_mets, first_indices, counts = np.unique( + met_all, return_index=True, return_counts=True + ) + paired_mask = counts == 2 + expected_esa_steps = mock_l1a_de["esa_step"].values[first_indices[paired_mask]] + + np.testing.assert_array_equal( + goodtimes_instance["esa_step"].values, expected_esa_steps + ) + + def test_from_l1a_de_attributes(self, goodtimes_instance): + """Test that attributes are set correctly.""" + assert goodtimes_instance.attrs["sensor"] == "Hi45" + assert goodtimes_instance.attrs["pointing"] == 42 + + +class TestRemoveTimes: + """Test suite for Goodtimes.remove_times() method.""" + + def test_remove_times_single_met_all_bins(self, goodtimes_instance): + """Test flagging a single MET with all bins.""" + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) + + # Check that all bins for the first MET are flagged + assert np.all(goodtimes_instance["cull_flags"].values[0, :] == CullCode.LOOSE) + + # Check that other METs are still good + assert np.all(goodtimes_instance["cull_flags"].values[1:, :] == CullCode.GOOD) + + def test_remove_times_single_met_specific_bins(self, goodtimes_instance): + """Test flagging specific bins for a single MET.""" + met_val = goodtimes_instance.coords["met"].values[0] + bins_to_flag = np.array([0, 1, 2, 10]) + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=bins_to_flag, cull=CullCode.LOOSE + ) + + # Check that specified bins are flagged + assert np.all( + goodtimes_instance["cull_flags"].values[0, bins_to_flag] == CullCode.LOOSE + ) + + # Check that other bins are still good + other_bins = np.setdiff1d(np.arange(90), bins_to_flag) + assert np.all( + goodtimes_instance["cull_flags"].values[0, other_bins] == CullCode.GOOD + ) + + def test_remove_times_multiple_mets(self, goodtimes_instance): + """Test flagging multiple METs.""" + met_vals = goodtimes_instance.coords["met"].values[:3] + goodtimes_instance.goodtimes.remove_times( + met=met_vals, bins=None, cull=CullCode.LOOSE + ) + + # Check that first 3 METs are flagged + assert np.all(goodtimes_instance["cull_flags"].values[:3, :] == CullCode.LOOSE) + + # Check that other METs are still good + assert np.all(goodtimes_instance["cull_flags"].values[3:, :] == CullCode.GOOD) + + def test_remove_times_time_range(self, goodtimes_instance): + """Test flagging a time range.""" + met_vals = goodtimes_instance.coords["met"].values + met_start = met_vals[2] + met_end = met_vals[5] + + goodtimes_instance.goodtimes.remove_times( + met=(met_start, met_end), bins=None, cull=CullCode.LOOSE + ) + + # Check that METs 2-5 are flagged + assert np.all(goodtimes_instance["cull_flags"].values[2:6, :] == CullCode.LOOSE) + + # Check that other METs are still good + assert np.all(goodtimes_instance["cull_flags"].values[:2, :] == CullCode.GOOD) + assert np.all(goodtimes_instance["cull_flags"].values[6:, :] == CullCode.GOOD) + + def test_remove_times_invalid_cull_code_zero(self, goodtimes_instance): + """Test that cull code 0 raises ValueError.""" + met_val = goodtimes_instance.coords["met"].values[0] + with pytest.raises(ValueError, match="Cull code must be non-zero"): + goodtimes_instance.goodtimes.remove_times(met=met_val, cull=0) + + def test_remove_times_invalid_bin_indices(self, goodtimes_instance): + """Test that invalid bin indices raise ValueError.""" + met_val = goodtimes_instance.coords["met"].values[0] + + # Test bin < 0 + with pytest.raises(ValueError, match="Spin bins must be in range"): + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.array([-1, 0]) + ) + + # Test bin >= 90 + with pytest.raises(ValueError, match="Spin bins must be in range"): + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.array([89, 90]) + ) + + def test_remove_times_met_out_of_range(self, goodtimes_instance): + """Test that MET outside valid range raises ValueError.""" + met_vals = goodtimes_instance.coords["met"].values + met_out_of_range = met_vals[-1] + 1000 + + with pytest.raises(ValueError, match="MET value\\(s\\) outside valid range"): + goodtimes_instance.goodtimes.remove_times(met=met_out_of_range) + + def test_remove_times_overwrites_existing_cull(self, goodtimes_instance): + """Test that new cull code overwrites existing one.""" + met_val = goodtimes_instance.coords["met"].values[0] + + # Flag with LOOSE + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) + assert np.all(goodtimes_instance["cull_flags"].values[0, :] == CullCode.LOOSE) + + # Overwrite with a different cull code + goodtimes_instance.goodtimes.remove_times(met=met_val, bins=None, cull=2) + assert np.all(goodtimes_instance["cull_flags"].values[0, :] == 2) + + +class TestGetGoodIntervals: + """Test suite for Goodtimes.get_good_intervals() method.""" + + def test_get_good_intervals_all_good(self, goodtimes_instance): + """Test getting intervals when all times are good.""" + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # Should have one interval per MET + n_met = len(goodtimes_instance.coords["met"]) + assert len(intervals) == n_met + + # Check interval structure + assert intervals.dtype == INTERVAL_DTYPE + + def test_get_good_intervals_structure(self, goodtimes_instance): + """Test interval structure and field names.""" + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # Check that all fields exist + assert "met_start" in intervals.dtype.names + assert "met_end" in intervals.dtype.names + assert "spin_bin_low" in intervals.dtype.names + assert "spin_bin_high" in intervals.dtype.names + assert "n_good_bins" in intervals.dtype.names + assert "esa_step" in intervals.dtype.names + + def test_get_good_intervals_all_good_values(self, goodtimes_instance): + """Test interval values when all bins are good.""" + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # When all bins are good, should have bins 0-89 + for interval in intervals: + assert interval["spin_bin_low"] == 0 + assert interval["spin_bin_high"] == 89 + assert interval["n_good_bins"] == 90 + assert interval["met_start"] == interval["met_end"] + + def test_get_good_intervals_with_culled_bins(self, goodtimes_instance): + """Test intervals when some bins are culled.""" + # Flag bins 0-20 for first MET + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.arange(21), cull=CullCode.LOOSE + ) + + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # First interval should only have bins 21-89 + assert intervals[0]["spin_bin_low"] == 21 + assert intervals[0]["spin_bin_high"] == 89 + assert intervals[0]["n_good_bins"] == 69 + + def test_get_good_intervals_with_gaps(self, goodtimes_instance): + """Test intervals when good bins have gaps (wraparound).""" + # Flag bins 20-70 for first MET, leaving bins 0-19 and 71-89 as good + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.arange(20, 71), cull=CullCode.LOOSE + ) + + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # Should create 2 intervals for the first MET (bins split by gap) + # Plus 9 more intervals for the remaining METs + assert len(intervals) == 11 + + # First two intervals should be for the same MET + assert intervals[0]["met_start"] == intervals[1]["met_start"] + + # Check the two segments + assert intervals[0]["spin_bin_low"] == 0 + assert intervals[0]["spin_bin_high"] == 19 + assert intervals[1]["spin_bin_low"] == 71 + assert intervals[1]["spin_bin_high"] == 89 + + def test_get_good_intervals_all_bins_culled(self, goodtimes_instance): + """Test intervals when all bins are culled for a MET.""" + # Flag all bins for first MET + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) + + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # Should have 9 intervals (one per good MET, excluding the first) + assert len(intervals) == 9 + + # First interval should be for the second MET + assert intervals[0]["met_start"] == goodtimes_instance.coords["met"].values[1] + + def test_get_good_intervals_empty(self): + """Test intervals with empty goodtimes dataset.""" + # Create empty dataset + gt = xr.Dataset( + data_vars={ + "cull_flags": xr.DataArray( + np.zeros((0, 90), dtype=np.uint8), dims=["met", "spin_bin"] + ), + "esa_step": xr.DataArray(np.array([], dtype=np.uint8), dims=["met"]), + }, + coords={"met": np.array([]), "spin_bin": np.arange(90)}, + attrs={"sensor": "Hi45", "pointing": 0}, + ) + + intervals = gt.goodtimes.get_good_intervals() + assert len(intervals) == 0 + + def test_get_good_intervals_esa_step_included(self, goodtimes_instance): + """Test that ESA step is included in intervals.""" + intervals = goodtimes_instance.goodtimes.get_good_intervals() + + # Check that each interval has an ESA step + for i, interval in enumerate(intervals): + expected_esa_step = goodtimes_instance["esa_step"].values[i] + assert interval["esa_step"] == expected_esa_step + + +class TestGetCullStatistics: + """Test suite for Goodtimes.get_cull_statistics() method.""" + + def test_get_cull_statistics_all_good(self, goodtimes_instance): + """Test statistics when all bins are good.""" + stats = goodtimes_instance.goodtimes.get_cull_statistics() + + total_bins = len(goodtimes_instance.coords["met"]) * 90 + assert stats["total_bins"] == total_bins + assert stats["good_bins"] == total_bins + assert stats["culled_bins"] == 0 + assert stats["fraction_good"] == 1.0 + assert stats["cull_code_counts"] == {} + + def test_get_cull_statistics_with_culls(self, goodtimes_instance): + """Test statistics after culling some bins.""" + # Flag first MET, all bins + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=None, cull=CullCode.LOOSE + ) + + stats = goodtimes_instance.goodtimes.get_cull_statistics() + + total_bins = len(goodtimes_instance.coords["met"]) * 90 + assert stats["total_bins"] == total_bins + assert stats["good_bins"] == total_bins - 90 + assert stats["culled_bins"] == 90 + assert stats["fraction_good"] == (total_bins - 90) / total_bins + assert stats["cull_code_counts"][CullCode.LOOSE] == 90 + + def test_get_cull_statistics_multiple_cull_codes(self, goodtimes_instance): + """Test statistics with multiple cull codes.""" + met_vals = goodtimes_instance.coords["met"].values + + # Flag first MET with LOOSE + goodtimes_instance.goodtimes.remove_times( + met=met_vals[0], bins=None, cull=CullCode.LOOSE + ) + + # Flag second MET with code 2 + goodtimes_instance.goodtimes.remove_times(met=met_vals[1], bins=None, cull=2) + + stats = goodtimes_instance.goodtimes.get_cull_statistics() + + assert stats["culled_bins"] == 180 + assert stats["cull_code_counts"][CullCode.LOOSE] == 90 + assert stats["cull_code_counts"][2] == 90 + + +class TestToTxt: + """Test suite for Goodtimes.to_txt() method.""" + + def test_to_txt_creates_file(self, goodtimes_instance, tmp_path): + """Test that to_txt creates a file.""" + output_path = tmp_path / "goodtimes.txt" + result = goodtimes_instance.goodtimes.write_txt(output_path) + + assert result == output_path + assert output_path.exists() + + def test_to_txt_format(self, goodtimes_instance, tmp_path): + """Test the format of the output file.""" + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.goodtimes.write_txt(output_path) + + with open(output_path) as f: + lines = f.readlines() + + # Should have one line per interval (10 METs, all good) + assert len(lines) == 10 + + # Check format of first line + parts = lines[0].strip().split() + assert len(parts) == 7 + assert parts[0] == "00042" # pointing + assert parts[5] == "Hi45" # sensor + + def test_to_txt_values(self, goodtimes_instance, tmp_path): + """Test the values in the output file.""" + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.goodtimes.write_txt(output_path) + + with open(output_path) as f: + line = f.readline() + + parts = line.strip().split() + pointing, met_start, met_end, bin_low, bin_high, sensor, esa_step = parts + + assert pointing == "00042" + assert int(met_start) == int(goodtimes_instance.coords["met"].values[0]) + assert int(met_end) == int(goodtimes_instance.coords["met"].values[0]) + assert int(bin_low) == 0 + assert int(bin_high) == 89 + assert sensor == "Hi45" + assert int(esa_step) == goodtimes_instance["esa_step"].values[0] + + def test_to_txt_with_culled_bins(self, goodtimes_instance, tmp_path): + """Test output when some bins are culled.""" + # Flag bins 0-20 for first MET + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.arange(21), cull=CullCode.LOOSE + ) + + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.goodtimes.write_txt(output_path) + + with open(output_path) as f: + first_line = f.readline() + + parts = first_line.strip().split() + bin_low = int(parts[3]) + bin_high = int(parts[4]) + + # First interval should only include bins 21-89 + assert bin_low == 21 + assert bin_high == 89 + + def test_to_txt_with_gaps(self, goodtimes_instance, tmp_path): + """Test output when bins have gaps.""" + # Flag bins 20-70, leaving 0-19 and 71-89 as good + met_val = goodtimes_instance.coords["met"].values[0] + goodtimes_instance.goodtimes.remove_times( + met=met_val, bins=np.arange(20, 71), cull=CullCode.LOOSE + ) + + output_path = tmp_path / "goodtimes.txt" + goodtimes_instance.goodtimes.write_txt(output_path) + + with open(output_path) as f: + lines = f.readlines() + + # Should have 11 lines (2 for first MET, 1 for each of 9 remaining METs) + assert len(lines) == 11 + + # First two lines should be for same MET + parts1 = lines[0].strip().split() + parts2 = lines[1].strip().split() + assert parts1[1] == parts2[1] # Same met_start + + # Check bin ranges + assert int(parts1[3]) == 0 + assert int(parts1[4]) == 19 + assert int(parts2[3]) == 71 + assert int(parts2[4]) == 89 + + +class TestIntervalDtype: + """Test suite for INTERVAL_DTYPE.""" + + def test_interval_dtype_fields(self): + """Test that INTERVAL_DTYPE has correct fields.""" + field_names = INTERVAL_DTYPE.names + assert "met_start" in field_names + assert "met_end" in field_names + assert "spin_bin_low" in field_names + assert "spin_bin_high" in field_names + assert "n_good_bins" in field_names + assert "esa_step" in field_names + + def test_interval_dtype_types(self): + """Test that INTERVAL_DTYPE has correct field types.""" + assert INTERVAL_DTYPE["met_start"] == np.float64 + assert INTERVAL_DTYPE["met_end"] == np.float64 + assert INTERVAL_DTYPE["spin_bin_low"] == np.int32 + assert INTERVAL_DTYPE["spin_bin_high"] == np.int32 + assert INTERVAL_DTYPE["n_good_bins"] == np.int32 + assert INTERVAL_DTYPE["esa_step"] == np.uint8