Skip to content

Conversation

@g-valentino
Copy link

At the momemnt workflow generation looks good, but some issues with topics when publishing to output topic.

AssertionError
(.venv) gevalentino@DESKTOP-8SNURVT:~/repos/graphite$ python tests/workflow/test_workflow_decorators.py 
oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'> workflow_id='f138d1f963e94f13ba7dd3aa1cbc95bf' name='MultiNodeWorkflow' type='MultiNodeWorkflow' nodes={'bye': CallableNode(node_id='ab5a2a17d1a345c19ef4379fa357576f', name='bye', type='CallableToolNode', tool=CallableTool(tool_id='f6c9b557f9fb4cad994c89211abe5772', name='CallableTool', type='CallableTool', oi_span_type=<OpenInferenceSpanKindValues.TOOL: 'TOOL'>), oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'>, subscribed_expressions=[TopicExpr(topic=Topic(name='foo_bar_topic', type=<TopicType.DEFAULT_TOPIC_TYPE: 'Topic'>, condition=<function workflow.<locals>.generate.<locals>.<lambda> at 0x7e3904e8c680>, event_cache=<grafi.common.topics.topic_event_cache.TopicEventCache object at 0x7e3904e80830>, publish_event_handler=<bound method EventDrivenWorkflow.on_event of MultiNodeWorkflow(oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'>, workflow_id='f138d1f963e94f13ba7dd3aa1cbc95bf', name='MultiNodeWorkflow', type='MultiNodeWorkflow', nodes={...})>))], publish_to=[OutputTopic(name='output_topic', type=<TopicType.AGENT_OUTPUT_TOPIC_TYPE: 'AgentOutputTopic'>, condition=<function workflow.<locals>.generate.<locals>.<lambda> at 0x7e3904e8c4a0>, event_cache=<grafi.common.topics.topic_event_cache.TopicEventCache object at 0x7e3904e80560>, publish_event_handler=<bound method EventDrivenWorkflow.on_event of MultiNodeWorkflow(oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'>, workflow_id='f138d1f963e94f13ba7dd3aa1cbc95bf', name='MultiNodeWorkflow', type='MultiNodeWorkflow', nodes={...})>)]), 'hello': CallableNode(node_id='e03362f9e8f6404eb888f7c78f2c710a', name='hello', type='CallableToolNode', tool=CallableTool(tool_id='88fecc91c2b1465d9d486f4510f882a8', name='CallableTool', type='CallableTool', oi_span_type=<OpenInferenceSpanKindValues.TOOL: 'TOOL'>), oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'>, subscribed_expressions=[TopicExpr(topic=InputTopic(name='input_topic', type=<TopicType.AGENT_INPUT_TOPIC_TYPE: 'AgentInputTopic'>, condition=<function workflow.<locals>.generate.<locals>.<lambda> at 0x7e3904e8c5e0>, event_cache=<grafi.common.topics.topic_event_cache.TopicEventCache object at 0x7e3904e806b0>, publish_event_handler=<bound method EventDrivenWorkflow.on_event of MultiNodeWorkflow(oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'>, workflow_id='f138d1f963e94f13ba7dd3aa1cbc95bf', name='MultiNodeWorkflow', type='MultiNodeWorkflow', nodes={...})>))], publish_to=[Topic(name='foo_bar_topic', type=<TopicType.DEFAULT_TOPIC_TYPE: 'Topic'>, condition=<function workflow.<locals>.generate.<locals>.<lambda> at 0x7e3904e8c680>, event_cache=<grafi.common.topics.topic_event_cache.TopicEventCache object at 0x7e3904e80830>, publish_event_handler=<bound method EventDrivenWorkflow.on_event of MultiNodeWorkflow(oi_span_type=<OpenInferenceSpanKindValues.AGENT: 'AGENT'>, workflow_id='f138d1f963e94f13ba7dd3aa1cbc95bf', name='MultiNodeWorkflow', type='MultiNodeWorkflow', nodes={...})>)])}
2025-09-14 15:28:39.079 | WARNING  | grafi.common.containers.container:event_store:48 - Using EventStoreInMemory. This is ONLY suitable for local testing but not for production.
2025-09-14 15:28:39.079 | INFO     | grafi.common.instrumentations.tracing:setup_tracing:274 - Trying auto-detection for tracing at localhost:4317
2025-09-14 15:28:39.079 | INFO     | grafi.common.instrumentations.tracing:_setup_auto_tracing:189 - Trying default collector at localhost:4317
2025-09-14 15:28:39.080 | DEBUG    | grafi.common.instrumentations.tracing:is_local_endpoint_available:54 - Endpoint check failed for localhost:4317 - [Errno 111] Connection refused
2025-09-14 15:28:39.080 | DEBUG    | grafi.common.instrumentations.tracing:_setup_in_memory_tracing:216 - Using in-memory tracing (no external endpoint available)
PUBLUSHING
ADDING EVENTevent_id='e60d82617b4a43eda1899ab460f71992' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 78822, tzinfo=datetime.timezone.utc) name='input_topic' type=<TopicType.AGENT_INPUT_TOPIC_TYPE: 'AgentInputTopic'> offset=-1 data=[Message(name=None, message_id='18398584ee934bb1866f5e53e8f02f6a', timestamp=1757888919078797840, content='Test message', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=[] publisher_name='MultiNodeWorkflow' publisher_type='MultiNodeWorkflow'
APPENDED!
hello
In hello 2
foo_bar_topic
event_id='e12e2e21e22b4b9cb5252c59dbdcd233' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 81603, tzinfo=datetime.timezone.utc) name='' type=<TopicType.NONE_TOPIC_TYPE: 'NoneTopic'> offset=-1 data=[Message(name=None, message_id='a6dd0039f28a4986b049f4377b874a39', timestamp=1757888919081593009, content='Got test message', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=['83013981dce8423f8df5f47bc4a8dc5b'] publisher_name='hello' publisher_type='CallableToolNode'
PUBLUSHING
ADDING EVENTevent_id='e12e2e21e22b4b9cb5252c59dbdcd233' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 81603, tzinfo=datetime.timezone.utc) name='foo_bar_topic' type=<TopicType.DEFAULT_TOPIC_TYPE: 'Topic'> offset=-1 data=[Message(name=None, message_id='a6dd0039f28a4986b049f4377b874a39', timestamp=1757888919081593009, content='Got test message', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=['83013981dce8423f8df5f47bc4a8dc5b'] publisher_name='hello' publisher_type='CallableToolNode'
APPENDED!
event_id='e12e2e21e22b4b9cb5252c59dbdcd233' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 81603, tzinfo=datetime.timezone.utc) name='foo_bar_topic' type=<TopicType.DEFAULT_TOPIC_TYPE: 'Topic'> offset=0 data=[Message(name=None, message_id='a6dd0039f28a4986b049f4377b874a39', timestamp=1757888919081593009, content='Got test message', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=['83013981dce8423f8df5f47bc4a8dc5b'] publisher_name='hello' publisher_type='CallableToolNode'
[PublishToTopicEvent(event_id='e12e2e21e22b4b9cb5252c59dbdcd233', event_version='1.0', invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}), event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'>, timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 81603, tzinfo=datetime.timezone.utc), name='foo_bar_topic', type=<TopicType.DEFAULT_TOPIC_TYPE: 'Topic'>, offset=0, data=[Message(name=None, message_id='a6dd0039f28a4986b049f4377b874a39', timestamp=1757888919081593009, content='Got test message', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)], consumed_event_ids=['83013981dce8423f8df5f47bc4a8dc5b'], publisher_name='hello', publisher_type='CallableToolNode')]
bye
In bye 3
output_topic
event_id='1b5ebd7dc1c14cdc91dddb479452ee40' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 82303, tzinfo=datetime.timezone.utc) name='' type=<TopicType.NONE_TOPIC_TYPE: 'NoneTopic'> offset=-1 data=[Message(name=None, message_id='ba1c9b2076e646009632bfb76aafb6dd', timestamp=1757888919082293360, content='hi', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=['cae5b8c320124e82bd355e31174ae593'] publisher_name='bye' publisher_type='CallableToolNode'
PUBLUSHING
ADDING EVENTevent_id='1b5ebd7dc1c14cdc91dddb479452ee40' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 82303, tzinfo=datetime.timezone.utc) name='output_topic' type=<TopicType.AGENT_OUTPUT_TOPIC_TYPE: 'AgentOutputTopic'> offset=-1 data=[Message(name=None, message_id='ba1c9b2076e646009632bfb76aafb6dd', timestamp=1757888919082293360, content='hi', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=['cae5b8c320124e82bd355e31174ae593'] publisher_name='bye' publisher_type='CallableToolNode'
APPENDED!
event_id='1b5ebd7dc1c14cdc91dddb479452ee40' event_version='1.0' invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}) event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'> timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 82303, tzinfo=datetime.timezone.utc) name='output_topic' type=<TopicType.AGENT_OUTPUT_TOPIC_TYPE: 'AgentOutputTopic'> offset=0 data=[Message(name=None, message_id='ba1c9b2076e646009632bfb76aafb6dd', timestamp=1757888919082293360, content='hi', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)] consumed_event_ids=['cae5b8c320124e82bd355e31174ae593'] publisher_name='bye' publisher_type='CallableToolNode'
[PublishToTopicEvent(event_id='1b5ebd7dc1c14cdc91dddb479452ee40', event_version='1.0', invoke_context=InvokeContext(conversation_id='conversation_id', invoke_id='a90b13562f1a428dbf56a911f1e3f983', assistant_request_id='1c14dd089dca493ca060c7d650b6500a', user_id='', kwargs={}), event_type=<EventType.PUBLISH_TO_TOPIC: 'PublishToTopic'>, timestamp=datetime.datetime(2025, 9, 14, 22, 28, 39, 82303, tzinfo=datetime.timezone.utc), name='output_topic', type=<TopicType.AGENT_OUTPUT_TOPIC_TYPE: 'AgentOutputTopic'>, offset=0, data=[Message(name=None, message_id='ba1c9b2076e646009632bfb76aafb6dd', timestamp=1757888919082293360, content='hi', refusal=None, annotations=None, audio=None, role='user', tool_call_id=None, tools=None, function_call=None, tool_calls=None, is_streaming=False)], consumed_event_ids=['cae5b8c320124e82bd355e31174ae593'], publisher_name='bye', publisher_type='CallableToolNode')]
2025-09-14 15:28:39.082 | INFO     | grafi.workflows.workflow:stop:41 - Workflow stop requested
2025-09-14 15:28:39.082 | INFO     | grafi.workflows.impl.event_driven_workflow:_invoke_node:527 - Node bye was cancelled
2025-09-14 15:28:39.082 | INFO     | grafi.workflows.impl.event_driven_workflow:_invoke_node:527 - Node hello was cancelled
Traceback (most recent call last):
  File "/home/gevalentino/repos/graphite/tests/workflow/test_workflow_decorators.py", line 117, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/gevalentino/repos/graphite/tests/workflow/test_workflow_decorators.py", line 114, in main
    assert(has_messages)
           ^^^^^^^^^^^^
AssertionError

The message is published to output topic, but its not observed in the test.
When in the same spot definig SingleNodeWorkflow and MultiNodeWorklow, the second invocation doesnt work properly.

Need to check if there is some global state (there should never be one) that is preserved across workflows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants