Skip to content

Conversation

@hanna-paasivirta
Copy link

@hanna-paasivirta hanna-paasivirta commented Sep 25, 2025

Description

This PR adds answer streaming via WebSocket for AI Assistants. This allows users to see see status updates and a word stream animation in real-time, without having to wait for the complete response.

Closes #3585

Backend Changes

SSE Stream Client (lib/lightning/apollo_client/sse_stream.ex)

  • New module for Server-Sent Events streaming from Apollo
  • Uses Finch streaming HTTP client to maintain persistent
    connections to Apollo's /services/{service}/stream endpoints
  • Parses SSE event format and handles four event types from
    Apollo:
    • content_block_delta: Text chunks and thinking status from
      Anthropic streaming API
    • message_stop: Signals end of streaming content
    • complete: Final payload with usage stats, metadata, and
      generated code
    • error: Error messages from Apollo
  • Broadcasts events to Phoenix PubSub on
    ai_session:#{session_id} topic for real-time delivery to
    LiveView

Message Processor

(lib/lightning/ai_assistant/message_processor.ex)

  • Modified to initiate SSE streaming for both job and workflow
    messages
  • stream_job_message/3 and start_workflow_streaming_request/3
    start SSE connections instead of waiting for HTTP response
  • Returns {:ok, :streaming} to keep user message in
    :processing state while streaming
  • Builds streaming payloads with lightning_session_id for
    PubSub routing
  • Constructs context (expression, adaptor, logs) and chat
    history for Apollo

Edit LiveView (lib/lightning_web/live/workflow_live/edit.ex)

  • Added ai_assistant_registry to map session IDs to component
    IDs
  • handle_component_registration/2 subscribes to PubSub topic
    when component registers
  • handle_streaming_update/3 receives four streaming event
    types from PubSub and forwards to AI Assistant component via
    send_update
  • Routes events based on session ID lookup in registry

Frontend Changes

AI Assistant Component (lib/lightning_web/live/ai_assistant/component.ex)

  • Added four update/2 handlers for streaming events:
    • streaming_chunk: Accumulates content in streaming_content
      assign
    • status_update: Updates streaming_status (e.g.,
      "Thinking...")
    • streaming_complete: Marks content streaming finished
    • streaming_payload_complete: Saves complete message with
      usage stats, metadata, and code
  • On payload complete, saves assistant message to database
    using AiAssistant.save_message/3
  • Updates pending user messages to :success status
  • Clears streaming state and invokes on_message_received
    callback
  • Template displays streaming content in real-time with status
    indicator

StreamingText Hook (assets/js/hooks/index.ts)

  • New Phoenix Hook for smooth markdown rendering of streaming
    content
  • Re-parses accumulated content as markdown on each update
    using marked.js
  • Custom renderer matches backend Earmark styling (code
    blocks, links, headings, lists)
  • Handles split markdown tokens (e.g., split triple-backticks)
    by always parsing full accumulated string

ScrollToMessage Hook (assets/js/hooks/index.ts)

  • Enhanced to track user scroll position and respect manual
    scrolling
  • Only auto-scrolls to bottom when user is already at bottom
    (within 50px threshold)
  • Uses instant scroll during streaming updates to prevent jank
    from smooth scroll animations

Workflow AI Chat Component (lib/lightning_web/live/workflow_live/workflow_ai_chat_component.ex)

  • Modified on_message_received callback to conditionally push
    template_selected event
  • Only sends event when code is present and non-empty,
    preventing empty template pushes

Additional notes for the reviewer

This work is heavily AI-generated with Claude Code and may need careful review since I'm not familiar with Elixir.

It might be worth double checking that we are processing the full payload as before (usage stats, logging to database)

AI Usage

Please disclose how you've used AI in this work (it's cool, we just want to know!):

  • Code generation (copilot but not intellisense)
  • Learning or fact checking
  • Strategy / design
  • Optimisation / refactoring
  • Translation / spellchecking / doc gen
  • Other
  • I have not used AI

You can read more details in our Responsible AI Policy

Pre-submission checklist

  • I have performed a self-review of my code.
  • I have implemented and tested all related authorization policies. (e.g., :owner, :admin, :editor, :viewer)
  • I have updated the changelog.
  • I have ticked a box in "AI usage" in this PR

@github-project-automation github-project-automation bot moved this to New Issues in v2 Sep 25, 2025
@elias-ba
Copy link
Contributor

elias-ba commented Oct 6, 2025

Hey @hanna-paasivirta ! Really exciting to see streaming come to life in this PR, this is going to make the AI assistant feel so much more responsive 🤩.

I wanted to discuss the architectural decision around WebSockex. I see you chose to have Lightning connect to Apollo’s existing WebSocket server, which makes total sense given Apollo already has that infrastructure. Before we merge though, I think it’s worth considering a few different approaches and getting @josephjclark and @stuartc’s input on which direction feels best for the codebase long-term.

Option 1: Keep WebSockex (the current approach implemented in this PR)

What you’ve implemented:

Lightning (WebSockex client) → Apollo (WebSocket server) → Anthropic

Pros:

  • No Apollo changes needed
  • Apollo’s WebSocket infrastructure already exists and works
  • WebSockex is a mature library

Cons:

  • New external dependency in Lightning
  • Need to implement WebSocket client lifecycle (connecting, reconnecting, cleanup)
  • Need manual process supervision
  • Need manual timeout handling
  • Adds a lot more lines of WebSocket client management code

Option 2: HTTP Streaming with Tesla (this is probably the simplest approach)

What would change:

  • Add a streaming endpoint to Apollo (just a few lines of code)
  • Use Lightning’s existing Tesla client for streaming
# In Lightning, uses existing Apollo client pattern
defp stream_job_message(session, content, options) do
  on_event = fn event ->
    case event["type"] do
      "CHUNK" -> broadcast_chunk(session.id, event["data"])
      "STATUS" -> broadcast_status(session.id, event["data"])
      "complete" -> broadcast_complete(session.id)
    end
  end

  Lightning.ApolloClient.job_chat_stream(content, options, on_event)
  {:ok, :streaming}
end

In Apollo, add the streaming endpoint:

app.post(`${name}/stream`, async (ctx) => {
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();
      const onEvent = (type: string, data: any) => {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({ type, data })}\n\n`)
        );
      };

      const result = await callService(m, port, ctx.body, undefined, onEvent);
      controller.enqueue(
        encoder.encode(`data: ${JSON.stringify({ type: 'complete', data: result })}\n\n`)
      );
      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
});

Pros:

  • No new Lightning dependencies (uses existing Tesla client)
  • Follows existing Apollo client pattern (like job_chat/2, workflow_chat/2)
  • Simpler Lightning code
  • Tesla handles timeouts, errors, and retries automatically
  • Stateless. No connection management needed

Cons:

  • Requires Apollo changes
  • One-way only (can’t send commands mid-stream, though we don’t need that)

Option 3: Reverse the connection using Phoenix Channels (we already do this with the worker)

What would change:

  • Apollo connects to Lightning using Phoenix Channels
  • Reuses Lightning’s existing WebSocket server infrastructure (like WorkerChannel, RunChannel)
# In Lightning, follows existing channel pattern
defmodule LightningWeb.AiChannel do
  use Phoenix.Channel

  def join("ai_session:" <> session_id, _params, socket) do
    {:ok, assign(socket, :session_id, session_id)}
  end

  def handle_in("chunk", %{"content" => content}, socket) do
    Lightning.broadcast(
      "ai_session:#{socket.assigns.session_id}",
      {:ai_assistant, :streaming_chunk, %{content: content}}
    )
    {:noreply, socket}
  end

  # ... similar handlers for status, complete
end

In Apollo:

// Connect to Lightning’s channel (similar to how workers connect)
const ws = new WebSocket(`ws://lightning/ai/websocket?token=${apiKey}`);
ws.send(JSON.stringify({
  topic: `ai_session:${session_id}`,
  event: "phx_join",
  payload: {},
}));

const onEvent = (type, data) => {
  ws.send(JSON.stringify({
    topic: `ai_session:${session_id}`,
    event: type.toLowerCase(),
    payload: { [type.toLowerCase()]: data },
  }));
};

Pros:

  • No new Lightning dependencies
  • Reuses Lightning’s existing Phoenix Channel patterns
  • Phoenix handles supervision, reconnection, and backpressure automatically
  • Consistent with how workers already connect to Lightning
  • Bidirectional if we ever need it

Cons:

  • Requires Apollo changes
  • Apollo needs to manage the WebSocket client connection

My Recommendation

Option 2 (HTTP streaming) feels like the sweet spot. It requires minimal Apollo change, keeps Lightning code simpler, and stays consistent with existing patterns.

But I’d love @josephjclark and @stuartc thoughts on:

  1. Is adding a new dependency (WebSockex) worth avoiding Apollo changes?
  2. Should we keep Apollo communication consistent (all via HTTP/Tesla)?
  3. Do we value bidirectional capability (WebSocket) even if we don’t need it now?

The current implementation works well, so if the team prefers to keep Apollo unchanged, Option 1 is totally viable. We’d just want to make sure the WebSocket client code has solid error handling, timeouts, and cleanup.

Thoughts?

@josephjclark
Copy link
Collaborator

I'll set up a meet tomorrow, but let's absolutely design this around Lightning and minimize the Lightning deps!

@stuartc
Copy link
Member

stuartc commented Oct 7, 2025

Echoing Elias here, there’s little reason to expose websockets, http streaming is a known quantity in chat apis. There are several OpenAI api compatible libraries available for handling chat message streaming with types/structs for working with individual messages like thinking/response etc.

To be clear, this is less about adding a dependency, and more about using websockets and our own message format when there are existing apis.

@josephjclark
Copy link
Collaborator

Option 2 it is!

@josephjclark
Copy link
Collaborator

Spoke to @stuartc and he thinks we should push on and switch to HTTP streaming now. So let's try this:

  • Implement the Anthropic message standard
  • Create new branches here and in apollo to switch over to HTTP streaming. Try and get that wrapped before EOW
  • Declare victory and move on to the next valiant endeavor

CC @hanna-paasivirta

Copy link
Collaborator

@josephjclark josephjclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic @hanna-paasivirta

Not particularly pretty while code suggestions are streaming in, but that's absolutely fine.

We need to hand this over to the Lightning team now to make any changes they need to the code, and to consider whether they want to make the streaming any prettier. I'd be tempted to stick with the quick win but it needs more discussion.

I'm heading over to the apollo side to make sure things are ship-shape over there (obviously we can't release this until apollo is done and live)

@hanna-paasivirta hanna-paasivirta marked this pull request as ready for review October 15, 2025 15:28
@josephjclark
Copy link
Collaborator

We may have to think about error handling here. Without a timeout, if something goes wrong, it seems the user gets stuck with an outgoing message forever

image

Apollo went away and I can't get off this screen. I can't cancel, I can't retry, and it won't timeout.

We should be able to tell if the stream was disconnected and should respond to that. We should also ensure that apollo returns errors when things go wrong so that we can handle the UI properly on this side.

@josephjclark
Copy link
Collaborator

Something else I notice here is that my CPU goes absolutely berserk while streaming is active (even if not actually streaming tokens)

:ok

"error" ->
Logger.error("[SSEStream] Received error event: #{inspect(data)}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha - we need to do something else here! When the error event comes through from Apollo, the chat message in the client still seems to be left hanging

We need to do the same Retry | Cancel stuff that happens in the regular post events.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reproduced this by adding a syntax error in the python side - very lucky really! You could also force apollo to throw an error when it goes into job chat, which should trigger the same state.

I think there's a different case that needs to be handled elsewhere: what happens if Apollo disappears half way through a stream? The client seems to hang, forever waiting for a response that won't come. You can repro that by sending a chat message, then killing the Apollo server

Copy link
Collaborator

@josephjclark josephjclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found a big issue in error handling - I think we'll need help with this

@github-project-automation github-project-automation bot moved this from New Issues to In review in v2 Oct 17, 2025
elias-ba added a commit that referenced this pull request Oct 20, 2025
This commit implements comprehensive error handling, performance optimizations,
and test coverage for the AI Assistant streaming feature to address critical
production blockers identified in PR review.

## Phase 1: Error Handling & Recovery (CRITICAL)

**SSE Stream (lib/lightning/apollo_client/sse_stream.ex):**
- Add broadcast_error/2 to broadcast streaming errors to components
- Implement timeout supervision (Apollo timeout + 10s buffer)
- Add timeout_ref and completed state fields for lifecycle tracking
- Handle Finch connection failures with proper error messages
- Fix critical bug: add missing {:error, reason, _acc} pattern match
- Handle HTTP error status codes (4xx, 5xx) before parsing chunks
- Cancel timeouts on successful completion or error

**AI Assistant Component (lib/lightning_web/live/ai_assistant/component.ex):**
- Add streaming_error assign to track error state
- Implement handle_streaming_error/2 to mark messages as error
- Create streaming_error_state/1 component with retry/cancel UI
- Add retry_streaming event handler to resubmit messages
- Add cancel_streaming event handler to clear error state
- Show error UI conditionally in render_individual_session/1

**Edit LiveView (lib/lightning_web/live/workflow_live/edit.ex):**
- Add :streaming_error handler in handle_info/2
- Route streaming errors to components via send_update

**Message Processor (lib/lightning/ai_assistant/message_processor.ex):**
- Update find_pending_user_messages to include :processing status

**AI Assistant Context (lib/lightning/ai_assistant/ai_assistant.ex):**
- Allow finding messages in both :pending and :processing states

## Phase 2: Performance Optimization (HIGH)

**StreamingText Hook (assets/js/hooks/index.ts):**
- Add performance monitoring with parseCount and timing
- Implement 50ms debouncing to batch rapid chunk arrivals
- Add proper cleanup in destroyed() hook
- Reduce markdown parsing frequency during streaming

**ScrollToMessage Hook (assets/js/hooks/index.ts):**
- Implement throttle() helper function
- Throttle scroll position checks to max 100ms intervals
- Reduce CPU usage from excessive scroll calculations
- Add proper event listener cleanup

## Phase 3: Production Polish (MEDIUM)

**Logging Cleanup:**
- Change verbose Logger.info to Logger.debug in:
  - lib/lightning/apollo_client/sse_stream.ex (9 changes)
  - lib/lightning/ai_assistant/message_processor.ex (6 changes)
  - lib/lightning_web/live/ai_assistant/component.ex (7 changes)
- Keep Logger.error for errors and Logger.info for key events

**Documentation (CHANGELOG.md):**
- Add comprehensive entry for AI Assistant Streaming feature
- Document user-facing features and technical implementation

## Test Coverage

**New Tests (24 tests, all passing):**

test/lightning/apollo_client/sse_stream_test.exs (9 tests):
- GenServer lifecycle and initialization
- Error event parsing and broadcasting
- Timeout handling
- Connection failure detection
- HTTP error responses
- Content chunk broadcasting
- Status update broadcasting
- Completion events
- Complete payload with metadata

test/lightning_web/live/workflow_live/ai_assistant_component_test.exs (3 tests):
- SSEStream error message formats
- Apollo JSON error parsing
- Component retry/cancel handler verification

## Bug Fixes

- Fix CaseClauseError in SSEStream when Finch returns {:error, reason, acc}
- This pattern occurs on connection refused before any HTTP response
- All 9 SSE stream tests now pass (previously 5/9 failing)

## Impact

Addresses critical production blockers from PR #3607 review:
1. ✅ Error handling gap - messages no longer stuck in processing state
2. ✅ CPU performance spike - debouncing and throttling reduce overhead
3. ✅ Missing error state transitions - full error → retry/cancel flow

## Manual Testing Required

Before merge, verify:
- Stream timeout → error UI → retry works
- Kill Apollo mid-stream → error UI appears
- Invalid code → Apollo error displays correctly
- CPU usage during streaming is acceptable

Co-authored-by: Claude <noreply@anthropic.com>
@elias-ba elias-ba force-pushed the ai-streaming branch 6 times, most recently from 0837fa0 to 00a7afd Compare October 21, 2025 00:15
@codecov
Copy link

codecov bot commented Oct 21, 2025

Codecov Report

❌ Patch coverage is 94.09594% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.88%. Comparing base (334fb9a) to head (2e7eca1).
⚠️ Report is 180 commits behind head on main.

Files with missing lines Patch % Lines
lib/lightning_web/live/ai_assistant/component.ex 92.85% 7 Missing ⚠️
lib/lightning_web/live/workflow_live/edit.ex 72.22% 5 Missing ⚠️
lib/lightning/ai_assistant/message_processor.ex 96.00% 2 Missing ⚠️
lib/lightning/apollo_client/sse_stream.ex 98.97% 1 Missing ⚠️
...b/live/workflow_live/workflow_ai_chat_component.ex 80.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3607      +/-   ##
==========================================
+ Coverage   88.66%   88.88%   +0.21%     
==========================================
  Files         414      415       +1     
  Lines       17942    18206     +264     
==========================================
+ Hits        15909    16183     +274     
+ Misses       2033     2023      -10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@elias-ba
Copy link
Contributor

elias-ba commented Oct 21, 2025

Hey guys, I picked up where Hanna left off and wrapped up the remaining work on this PR. Really solid foundation she laid down with the SSE streaming implementation - made it much easier to finish things up.

What I focused on

Test reliability: The test suite was a bit flaky, so I spent some time making it rock solid. Removed all the Process.sleep calls (we had about 565ms of arbitrary waits scattered around) and replaced them with proper synchronization using Oban.drain_queue, assert_receive, and the Eventually library for polling. All 61 tests passing now and they run faster too.

Error handling coverage: Added comprehensive test coverage for all the error scenarios Joe raised - timeouts, connection failures, HTTP errors, Apollo server disconnections. The timeout supervision is working as expected (Apollo timeout + 10s buffer), and all failure modes properly broadcast errors to the UI.

Streaming simulation helpers: Built out test/support/ai_assistant_helpers.ex so we can simulate SSE streaming in tests without actually hitting Apollo. Makes tests way more reliable and faster.

The timeout/error handling story

Joe's concern about messages getting stuck forever is addressed - every stream has supervised timeout, and all error types (connection failures, HTTP errors, timeouts) broadcast :streaming_error events that the LiveView picks up and shows to users with retry/cancel options.

Performance question

@josephjclark - you mentioned CPU going crazy during streaming. I haven't dug into that yet since I was focused on test reliability. Could you test with the latest commits and let me know if it's still an issue? If so, I can profile it and see what's causing the re-render storm. My guess is we might need to batch chunk updates or add some memoization.

Please test!

Would love for folks to give this a spin and let me know if you bump into any weirdness. The error scenarios should all be handled now, but real-world testing always surfaces interesting edge cases.

Thanks @hanna-paasivirta for the great groundwork on this! 🙌

@elias-ba
Copy link
Contributor

I noticed a small delay with messages that have changes to the workflows yaml. The message is rendered first then the workflow canvas is updated. Sometime the delay is as long as half a second or a second. I am not sure whether this is due to the streaming events or the canvas rendering. But I think it's worth paying attention to. I haven't done anything to fix it yet, cc @josephjclark @hanna-paasivirta can you guys check it out and let me know if you think it's bad enough to invest some time on it.

Replace Process.sleep with proper synchronization patterns for more
reliable and faster tests.

Changes:
- Use Oban.drain_queue for synchronous job execution
- Use assert_receive to wait for PubSub message arrival
- Use Eventually.eventually to poll for LiveView state updates
- Remove all Process.sleep calls from test helpers

This eliminates race conditions and arbitrary delays, making tests both
faster and more reliable by waiting for actual state changes rather than
fixed time periods.

All 61 AI Assistant tests passing.
Added comprehensive tests for all error handling branches and edge cases:
- Timeout handling (both active and completed streams)
- Connection errors (timeout, closed, shutdown, econnrefused)
- JSON parsing errors for complete and error events
- Unhandled event types (log, unknown events)
- Error message format variations

Coverage improved from 58% to 92% for sse_stream.ex.
All 72 AI assistant tests passing.
Add comprehensive tests for streaming error handling:
- Test retry_streaming event handler (passing)
- Test cancel_streaming event handler (passing)
- Test streaming error UI rendering (passing)

Implemented Finch mocking to prevent real SSE connection attempts
during tests. The mock blocks indefinitely allowing simulated errors
to be broadcast without interference from connection failures.

Key changes:
- Added Finch to Mimic.copy in test_helper.exs
- Updated stub_online to mock Finch.stream with global mode
- Used eventually blocks to click buttons while UI is visible
- Tests now fully cover streaming error branches in component.ex
Move Mimic.set_mimic_global() from stub_online helper to individual
tests that need it. This prevents global mode from affecting other
tests that use Mimic.expect.

The global mode is only needed for tests that call create_session
which spawns Oban worker processes that need access to Finch mocks.
Move Finch.stream stub from stub_online to separate stub_finch_streaming
function. This prevents the stub from affecting SSEStream tests which
expect real Finch behavior (connection failures).

Key changes:
- Created stub_finch_streaming() helper function
- AI assistant tests explicitly call stub_finch_streaming()
- SSEStream test rejects Finch stubs in setup to ensure real behavior
- All 3587 tests now pass with 0 failures
Copy link
Collaborator

@josephjclark josephjclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One QA note: when text streams in. we don't auto-scroll down the page to follow the latest characters. I suppose you can debate the correct behaviour there. I'm happy enough for now , I think we can prettify later when we've got a few less pressing things to to.

:closed -> "Connection closed unexpectedly"
{:shutdown, _} -> "Server shut down"
{:http_error, status} -> "Server returned error status #{status}"
_ -> "Connection error: #{inspect(reason)}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like this:

image

Are you able to just print that reason string?

@josephjclark
Copy link
Collaborator

Another QA note: I expect this happens on main too. But if a message fails, I get that nice error message. If I hit cancel and then refresh, I just get an infinite "Processing". I see this on some of my older chat errors too. I'd be happy to spin this out as an issue - but since I've seen it I want to record it.

@josephjclark
Copy link
Collaborator

Re workflow chat: yeah the delay is pretty annoying. I want to pass it but I suspect users are going to be annoyed by this.

What's the effort in putting the workflow stream behind a control switch, defaulting to false, and just using a regular POST? So we support both, optionally, but disable in prod.

Then we can raise an issue to either: 1) show a "loading" overlay on the diagram while waiting for the yaml to come back, or 2) modify the streaming in apollo so that the workflow is streamed down before the chat (with a nice status). I prefer 2.

@josephjclark
Copy link
Collaborator

@elias-ba My CPU seems way calmer while testing today. Not sure if it was just a bad time the other day or if you've fixed something? Seems great now!

@josephjclark
Copy link
Collaborator

One more thing to mention I think: while testing locally I did see a timeout in workflow chat. I have merged the timeout increase on the apollo side. I was surprised by this though - why should we see timeouts on streaming ? There should be a heartbeat keeping the message alive.

Maybe we just need to bump the timeout even higher?

@josephjclark josephjclark added the AI label Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

Add support for streaming text and status updates from Apollo AI service

5 participants