diff --git a/VERSION b/VERSION index 0d5696b3..82977005 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.0+2026.02.02T20.54.51.168Z.c0f2a8f0.berickson.20260113.extension.tab.ids +0.1.0+2026.02.03T15.21.57.253Z.1310c89e.berickson.20260130.execution.dag.single.value diff --git a/docs/how-to/communication_protocol.md b/docs/how-to/communication_protocol.md index b42484a8..a0725329 100644 --- a/docs/how-to/communication_protocol.md +++ b/docs/how-to/communication_protocol.md @@ -102,8 +102,9 @@ executor can build the right Redis keys. **Preferred: `scope_fields` (supports arbitrary scopes)** -Use `scope_fields` to supply each scope axis with a `values` iterable and a `path` -into each item. The scope field names should match the reducer scope: `student`, +Use `scope_fields` to supply each scope axis with either a `values` iterable or a +single value (applied across all items), plus an optional `path` into each item. +The scope field names should match the reducer scope: `student`, `doc_id`, `tab_id`, `page_id`, etc. ```python @@ -113,6 +114,8 @@ reducer_keys = query.keys( "student": {"values": query.variable("roster"), "path": "user_id"}, "doc_id": {"values": query.variable("documents"), "path": "doc_id"}, "tab_id": {"values": query.variable("tabs"), "path": "tab_id"}, + # or a single value + "student": "bobs_user_id" }, ) ``` diff --git a/learning_observer/VERSION b/learning_observer/VERSION index fcc4404a..82977005 100644 --- a/learning_observer/VERSION +++ b/learning_observer/VERSION @@ -1 +1 @@ -0.1.0+2026.02.02T16.19.01.249Z.2ae8ac1a.berickson.20260120.comm.protocol.targets +0.1.0+2026.02.03T15.21.57.253Z.1310c89e.berickson.20260130.execution.dag.single.value diff --git a/learning_observer/learning_observer/communication_protocol/executor.py b/learning_observer/learning_observer/communication_protocol/executor.py index 46411c9a..ef59307d 100644 --- a/learning_observer/learning_observer/communication_protocol/executor.py +++ b/learning_observer/learning_observer/communication_protocol/executor.py @@ -602,18 +602,125 @@ def _scope_field_candidates(field): return [] +# ============================================================================== +# Scope Value Handling +# ============================================================================== +# +# Scope fields can be specified as: +# 1. A single value (string, None, etc.) - broadcast across all items +# 2. An iterable of values - one per scope entry +# 3. An async iterable - same as above, but async +# +# SingleValue wraps case 1 to distinguish it from iterables. + + +class SingleValue: + """Wrapper indicating a single value to broadcast across all scope items. + + When building keys across multiple scope dimensions, single values are + repeated infinitely so they can be zipped with finite iterables from + other dimensions. + + Example: student="bob_id" with documents=[doc1, doc2, doc3] produces + keys for (bob_id, doc1), (bob_id, doc2), (bob_id, doc3). + """ + __slots__ = ('value',) + + def __init__(self, value): + self.value = value + + def __repr__(self): + return f'SingleValue({self.value!r})' + + +def _is_single_value(value): + """Check if value is a SingleValue wrapper.""" + return isinstance(value, SingleValue) + + +def _normalize_scope_value(value): + """Normalize a scope field value for consistent handling. + + - None, strings, and other scalars become SingleValue (broadcast) + - Lists and iterables pass through (one item per scope entry) + - Async iterables pass through unchanged + - Dicts pass through (will be iterated or accessed via path) + """ + if value is None: + return SingleValue(None) + if _is_single_value(value): + return value + if isinstance(value, collections.abc.AsyncIterable): + return value + if isinstance(value, (str, bytes)): + return SingleValue(value) + if isinstance(value, dict): + # Dicts represent structured data, not a collection to iterate + return value + if isinstance(value, collections.abc.Iterable): + return value + return SingleValue(value) + + +async def _repeat_forever(value): + """Async generator that yields the same value indefinitely.""" + while True: + yield value + + +def _expand_scope_value(value, *, broadcast=False): + """Convert a normalized scope value to an iterable. + + Args: + value: A normalized scope value (SingleValue or iterable). + broadcast: If True, single values repeat infinitely for zipping + with other dimensions. If False, yield once. + + Returns: + An iterable (sync or async) suitable for iteration. + """ + if _is_single_value(value): + return _repeat_forever(value.value) if broadcast else [value.value] + return value + + +def _parse_scope_spec(spec): + """Parse a scope specification into (values, path). + + Supports formats: + roster -> (roster, None) + {"values": roster, "path": "user_id"} -> (roster, "user_id") + {"value": roster} -> (roster, None) + + Returns: + Tuple of (values, path) where path may be None. + """ + if not isinstance(spec, dict): + return spec, None + + # Check for known value keys in priority order + for key in ('values', 'value', 'items', 'data'): + if key in spec: + values = spec[key] + path = spec.get('path') or spec.get('value_path') + return values, path + + # No recognized keys - treat entire dict as the value + return spec, None + + def _normalize_scope_field_specs(raw_specs): + """Normalize scope field specifications to a consistent format. + + Returns: + Dict mapping normalized field names to {"values": ..., "path": ...} + """ normalized = {} for key, spec in raw_specs.items(): normalized_key = _normalize_scope_field_key(key) - if isinstance(spec, dict): - values = spec.get('values', spec.get('value', spec.get('items', spec.get('data')))) - path = spec.get('path', spec.get('value_path')) - else: - values = spec - path = None + values, path = _parse_scope_spec(spec) normalized[normalized_key] = { - 'values': values, + 'values': _normalize_scope_value(values), 'path': path } return normalized @@ -640,18 +747,14 @@ async def _async_zip_many(iterables): async def _extract_fields_with_provenance(scope_specs): - '''Prepare the key field dictionary and provenance for each scope tuple. - The key field dictionary is used to create the key we are attempting - to fetch from the KVS (used later in `hack_handle_keys`). The passed in - `path` is used for setting the appropriate dictionary value. - The provenance is the current history of the communication protocol for each item. - ''' + """Prepare the key field dictionary and provenance for each scope tuple.""" if not scope_specs: return if len(scope_specs) == 1: + # Single dimension: simple iteration field, values, path = scope_specs[0] - async for item in ensure_async_generator(values): + async for item in ensure_async_generator(_expand_scope_value(values, broadcast=False)): field_value = get_nested_dict_value(item, path or '', '') fields = {field: field_value} item_provenance = item.get('provenance', {'value': item}) if isinstance(item, dict) else {'value': item} @@ -661,7 +764,11 @@ async def _extract_fields_with_provenance(scope_specs): yield fields, provenance return - iterables = [values for _, values, _ in scope_specs] + # Multiple dimensions: zip with broadcasting for single values + # Avoid infinite iteration when all dimensions are single values. + broadcast = not all(_is_single_value(values) for _, values, _ in scope_specs) + iterables = [_expand_scope_value(values, broadcast=broadcast) for _, values, _ in scope_specs] + async for items in _async_zip_many(iterables): fields = {} provenance = {} @@ -697,18 +804,18 @@ def _resolve_scope_specs(scope, kwargs): if path_key in kwargs: scope_specs.setdefault( _normalize_scope_field_key(key), - {'values': value, 'path': kwargs[path_key]} + {'values': _normalize_scope_value(value), 'path': kwargs[path_key]} ) if 'STUDENTS' in kwargs: scope_specs.setdefault( 'student', - {'values': kwargs['STUDENTS'], 'path': kwargs.get('STUDENTS_path')} + {'values': _normalize_scope_value(kwargs['STUDENTS']), 'path': kwargs.get('STUDENTS_path')} ) if 'RESOURCES' in kwargs: scope_specs.setdefault( 'doc_id', - {'values': kwargs['RESOURCES'], 'path': kwargs.get('RESOURCES_path')} + {'values': _normalize_scope_value(kwargs['RESOURCES']), 'path': kwargs.get('RESOURCES_path')} ) unexpected_scope_keys = set(scope_specs.keys()) - allowed_scope_keys diff --git a/learning_observer/learning_observer/util.py b/learning_observer/learning_observer/util.py index 525d2c9a..c3b08512 100644 --- a/learning_observer/learning_observer/util.py +++ b/learning_observer/learning_observer/util.py @@ -146,7 +146,7 @@ def get_nested_dict_value(d, key_str=None, default=MissingType.Missing): key_str = '' keys = key_str.split('.') for key in keys: - if d is not None and key in d: + if isinstance(d, dict) and key in d: d = d[key] elif key == '': d = d