Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2026.02.02T20.54.51.168Z.c0f2a8f0.berickson.20260113.extension.tab.ids
0.1.0+2026.02.03T15.21.57.253Z.1310c89e.berickson.20260130.execution.dag.single.value
7 changes: 5 additions & 2 deletions docs/how-to/communication_protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ executor can build the right Redis keys.

**Preferred: `scope_fields` (supports arbitrary scopes)**

Use `scope_fields` to supply each scope axis with a `values` iterable and a `path`
into each item. The scope field names should match the reducer scope: `student`,
Use `scope_fields` to supply each scope axis with either a `values` iterable or a
single value (applied across all items), plus an optional `path` into each item.
The scope field names should match the reducer scope: `student`,
`doc_id`, `tab_id`, `page_id`, etc.

```python
Expand All @@ -113,6 +114,8 @@ reducer_keys = query.keys(
"student": {"values": query.variable("roster"), "path": "user_id"},
"doc_id": {"values": query.variable("documents"), "path": "doc_id"},
"tab_id": {"values": query.variable("tabs"), "path": "tab_id"},
# or a single value
"student": "bobs_user_id"
},
)
```
Expand Down
2 changes: 1 addition & 1 deletion learning_observer/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2026.02.02T16.19.01.249Z.2ae8ac1a.berickson.20260120.comm.protocol.targets
0.1.0+2026.02.03T15.21.57.253Z.1310c89e.berickson.20260130.execution.dag.single.value
143 changes: 125 additions & 18 deletions learning_observer/learning_observer/communication_protocol/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,18 +602,125 @@ def _scope_field_candidates(field):
return []


# ==============================================================================
# Scope Value Handling
# ==============================================================================
#
# Scope fields can be specified as:
# 1. A single value (string, None, etc.) - broadcast across all items
# 2. An iterable of values - one per scope entry
# 3. An async iterable - same as above, but async
#
# SingleValue wraps case 1 to distinguish it from iterables.


class SingleValue:
"""Wrapper indicating a single value to broadcast across all scope items.

When building keys across multiple scope dimensions, single values are
repeated infinitely so they can be zipped with finite iterables from
other dimensions.

Example: student="bob_id" with documents=[doc1, doc2, doc3] produces
keys for (bob_id, doc1), (bob_id, doc2), (bob_id, doc3).
"""
__slots__ = ('value',)

def __init__(self, value):
self.value = value

def __repr__(self):
return f'SingleValue({self.value!r})'


def _is_single_value(value):
"""Check if value is a SingleValue wrapper."""
return isinstance(value, SingleValue)


def _normalize_scope_value(value):
"""Normalize a scope field value for consistent handling.

- None, strings, and other scalars become SingleValue (broadcast)
- Lists and iterables pass through (one item per scope entry)
- Async iterables pass through unchanged
- Dicts pass through (will be iterated or accessed via path)
"""
if value is None:
return SingleValue(None)
if _is_single_value(value):
return value
if isinstance(value, collections.abc.AsyncIterable):
return value
if isinstance(value, (str, bytes)):
return SingleValue(value)
if isinstance(value, dict):
# Dicts represent structured data, not a collection to iterate
return value
if isinstance(value, collections.abc.Iterable):
return value
return SingleValue(value)


async def _repeat_forever(value):
"""Async generator that yields the same value indefinitely."""
while True:
yield value


def _expand_scope_value(value, *, broadcast=False):
"""Convert a normalized scope value to an iterable.

Args:
value: A normalized scope value (SingleValue or iterable).
broadcast: If True, single values repeat infinitely for zipping
with other dimensions. If False, yield once.

Returns:
An iterable (sync or async) suitable for iteration.
"""
if _is_single_value(value):
return _repeat_forever(value.value) if broadcast else [value.value]
return value


def _parse_scope_spec(spec):
"""Parse a scope specification into (values, path).

Supports formats:
roster -> (roster, None)
{"values": roster, "path": "user_id"} -> (roster, "user_id")
{"value": roster} -> (roster, None)

Returns:
Tuple of (values, path) where path may be None.
"""
if not isinstance(spec, dict):
return spec, None

# Check for known value keys in priority order
for key in ('values', 'value', 'items', 'data'):
if key in spec:
values = spec[key]
path = spec.get('path') or spec.get('value_path')
return values, path

# No recognized keys - treat entire dict as the value
return spec, None


def _normalize_scope_field_specs(raw_specs):
"""Normalize scope field specifications to a consistent format.

Returns:
Dict mapping normalized field names to {"values": ..., "path": ...}
"""
normalized = {}
for key, spec in raw_specs.items():
normalized_key = _normalize_scope_field_key(key)
if isinstance(spec, dict):
values = spec.get('values', spec.get('value', spec.get('items', spec.get('data'))))
path = spec.get('path', spec.get('value_path'))
else:
values = spec
path = None
values, path = _parse_scope_spec(spec)
normalized[normalized_key] = {
'values': values,
'values': _normalize_scope_value(values),
'path': path
}
return normalized
Expand All @@ -640,18 +747,14 @@ async def _async_zip_many(iterables):


async def _extract_fields_with_provenance(scope_specs):
'''Prepare the key field dictionary and provenance for each scope tuple.
The key field dictionary is used to create the key we are attempting
to fetch from the KVS (used later in `hack_handle_keys`). The passed in
`path` is used for setting the appropriate dictionary value.
The provenance is the current history of the communication protocol for each item.
'''
"""Prepare the key field dictionary and provenance for each scope tuple."""
if not scope_specs:
return

if len(scope_specs) == 1:
# Single dimension: simple iteration
field, values, path = scope_specs[0]
async for item in ensure_async_generator(values):
async for item in ensure_async_generator(_expand_scope_value(values, broadcast=False)):
field_value = get_nested_dict_value(item, path or '', '')
fields = {field: field_value}
item_provenance = item.get('provenance', {'value': item}) if isinstance(item, dict) else {'value': item}
Expand All @@ -661,7 +764,11 @@ async def _extract_fields_with_provenance(scope_specs):
yield fields, provenance
return

iterables = [values for _, values, _ in scope_specs]
# Multiple dimensions: zip with broadcasting for single values
# Avoid infinite iteration when all dimensions are single values.
broadcast = not all(_is_single_value(values) for _, values, _ in scope_specs)
iterables = [_expand_scope_value(values, broadcast=broadcast) for _, values, _ in scope_specs]

async for items in _async_zip_many(iterables):
Comment on lines 767 to 772

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Stop infinite scope tuple generation when all fields are single values

When there are multiple scope dimensions, single values are expanded with broadcast=True, which uses _repeat_forever and makes each dimension an infinite async iterator. If every scope field is a single value (e.g., scope_fields={"student": "bob", "doc_id": "doc1"} or STUDENTS="bob" + RESOURCES="doc1"), _async_zip_many never hits StopAsyncIteration, so _extract_fields_with_provenance yields tuples forever and the query will hang. This is a regression introduced by the new SingleValue broadcasting; the multi-dimension case needs a finite termination condition when all dimensions are singletons.

Useful? React with 👍 / 👎.

fields = {}
provenance = {}
Expand Down Expand Up @@ -697,18 +804,18 @@ def _resolve_scope_specs(scope, kwargs):
if path_key in kwargs:
scope_specs.setdefault(
_normalize_scope_field_key(key),
{'values': value, 'path': kwargs[path_key]}
{'values': _normalize_scope_value(value), 'path': kwargs[path_key]}
)

if 'STUDENTS' in kwargs:
scope_specs.setdefault(
'student',
{'values': kwargs['STUDENTS'], 'path': kwargs.get('STUDENTS_path')}
{'values': _normalize_scope_value(kwargs['STUDENTS']), 'path': kwargs.get('STUDENTS_path')}
)
if 'RESOURCES' in kwargs:
scope_specs.setdefault(
'doc_id',
{'values': kwargs['RESOURCES'], 'path': kwargs.get('RESOURCES_path')}
{'values': _normalize_scope_value(kwargs['RESOURCES']), 'path': kwargs.get('RESOURCES_path')}
)

unexpected_scope_keys = set(scope_specs.keys()) - allowed_scope_keys
Expand Down
2 changes: 1 addition & 1 deletion learning_observer/learning_observer/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def get_nested_dict_value(d, key_str=None, default=MissingType.Missing):
key_str = ''
keys = key_str.split('.')
for key in keys:
if d is not None and key in d:
if isinstance(d, dict) and key in d:
d = d[key]
elif key == '':
d = d
Expand Down
Loading