From 900686101b11b8d5d677d490427a61912e97b60d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 8 Dec 2025 07:53:01 +0000 Subject: [PATCH] Refactor: Introduce AgentService and simplify MiniAgent interface This commit introduces the AgentService as a central abstraction for interacting with agents. It simplifies the MiniAgent interface by renaming `events` to `tapEventStream` and `getReducedContext` to `getState`. This change improves code organization and testability. Co-authored-by: jonas --- docs/actor-implementation-sketch.ts | 3 +- docs/architecture.md | 2 +- docs/design.ts | 7 +- src/agent-index.ts | 1 + src/agent-service-http.ts | 136 ++++++++++++++ src/agent-service.ts | 79 +++++++++ src/cli/chat-ui.ts | 81 ++++----- src/cli/commands.ts | 134 +++++++++----- src/cli/components/opentui-chat.tsx | 10 +- src/cli/event-context.ts | 29 +++ src/cli/main.ts | 7 +- src/domain.ts | 9 +- src/http-routes.ts | 263 +++++++++++++++++++++++++--- src/layercode/layercode.adapter.ts | 19 +- src/mini-agent.ts | 18 +- src/server.ts | 5 +- test/cli.e2e.test.ts | 23 +++ test/mini-agent.test.ts | 28 +-- test/tty.e2e.test.ts | 44 ++++- 19 files changed, 737 insertions(+), 161 deletions(-) create mode 100644 src/agent-service-http.ts create mode 100644 src/agent-service.ts create mode 100644 src/cli/event-context.ts diff --git a/docs/actor-implementation-sketch.ts b/docs/actor-implementation-sketch.ts index a1cd194..e03de6a 100644 --- a/docs/actor-implementation-sketch.ts +++ b/docs/actor-implementation-sketch.ts @@ -284,7 +284,8 @@ const makeMiniAgent = (agentName: AgentName) => * For tests: subscribe before adding events, or use Deferred for coordination. * ```typescript * const ready = yield* Deferred.make() - * const fiber = yield* agent.events.pipe( + * const stream = yield* agent.tapEventStream + * const fiber = yield* stream.pipe( * Stream.tap(() => Deferred.succeed(ready, void 0)), * Stream.runCollect, Effect.fork * ) diff --git a/docs/architecture.md b/docs/architecture.md index 7f76efe..c9fc488 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -25,7 +25,7 @@ Reference: **[design.ts](./design.ts)** for complete service interfaces and type │ addEvent(e) → persist → add to events → reduce → Mailbox.offer │ │ │ │ │ BROADCAST: ▼ │ -│ agent.events ◀── Stream.broadcastDynamic ◀──────────┘ │ +│ agent.tapEventStream ◀── Stream.broadcastDynamic ◀──┘ │ │ │ │ │ PROCESSING (when event.triggersAgentTurn=true): │ │ └──▶ debounce(100ms) ──▶ MiniAgentTurn.execute(ctx) │ diff --git a/docs/design.ts b/docs/design.ts index 7b1f07e..54f6ae2 100644 --- a/docs/design.ts +++ b/docs/design.ts @@ -416,13 +416,13 @@ export interface MiniAgent { readonly addEvent: (event: ContextEvent) => Effect.Effect /** LIVE event stream - each call creates new subscriber. Late subscribers miss history. */ - readonly events: Stream.Stream + readonly tapEventStream: Effect.Effect, never, Scope.Scope> /** Get all events from in-memory state (for historical events) */ readonly getEvents: Effect.Effect> /** Get current derived state */ - readonly getReducedContext: Effect.Effect + readonly getState: Effect.Effect /** Gracefully shutdown: complete in-flight work, emit SessionEndedEvent, close streams */ readonly shutdown: Effect.Effect @@ -461,7 +461,8 @@ export const sampleProgram = Effect.gen(function*() { const agent = yield* registry.getOrCreate(agentName) // Subscribe to event stream - const streamFiber = yield* agent.events.pipe( + const eventStream = yield* agent.tapEventStream + const streamFiber = yield* eventStream.pipe( Stream.tap((event) => Effect.log(`Event: ${event._tag}`)), Stream.runDrain, Effect.fork diff --git a/src/agent-index.ts b/src/agent-index.ts index 2a3bb59..4267bc4 100644 --- a/src/agent-index.ts +++ b/src/agent-index.ts @@ -50,6 +50,7 @@ export { // Services export { AgentRegistry } from "./agent-registry.ts" +export { AgentService } from "./agent-service.ts" export { EventReducer } from "./event-reducer.ts" export { EventStore } from "./event-store.ts" diff --git a/src/agent-service-http.ts b/src/agent-service-http.ts new file mode 100644 index 0000000..96b6e7f --- /dev/null +++ b/src/agent-service-http.ts @@ -0,0 +1,136 @@ +import { Clock, Effect, Layer, Schema, Scope, Stream } from "effect" +import { AgentService, type AgentServiceApi } from "./agent-service.ts" +import { EventReducer } from "./event-reducer.ts" +import { + type AgentName, + type ContextEvent, + ContextEvent as ContextEventSchema +} from "./domain.ts" + +const encodeEvent = Schema.encodeSync(ContextEventSchema) +const decodeEvent = Schema.decodeUnknown(ContextEventSchema) + +const jsonHeaders = { "Content-Type": "application/json" } + +const sanitizeBaseUrl = (url: string) => url.endsWith("/") ? url.slice(0, -1) : url + +const parseEventsResponse = (data: unknown): Effect.Effect> => + Effect.sync(() => { + if (typeof data !== "object" || data === null || !("events" in data)) { + throw new Error("Invalid events payload") + } + const eventsData = (data as { events: unknown }).events + if (!Array.isArray(eventsData)) { + throw new Error("Events field must be an array") + } + return eventsData + }).pipe( + Effect.flatMap((eventsData) => + Effect.forEach(eventsData, (entry) => decodeEvent(entry), { concurrency: "unbounded" }) + ) + ) + +export const makeHttpAgentServiceLayer = (baseUrl: string) => { + const normalizedBase = sanitizeBaseUrl(baseUrl) + + const fetchJson = (path: string, init?: RequestInit) => + Effect.tryPromise({ + try: async () => { + const response = await fetch(`${normalizedBase}${path}`, init) + if (!response.ok) { + const message = await response.text().catch(() => response.statusText) + throw new Error(`HTTP ${response.status}: ${message}`) + } + if (response.status === 204) { + return null + } + return response.json() + }, + catch: (error) => (error instanceof Error ? error : new Error(String(error))) + }) + + const postJson = (path: string, body?: unknown) => + fetchJson(path, { + method: "POST", + headers: jsonHeaders, + body: body === undefined ? undefined : JSON.stringify(body) + }) + + return Layer.effect( + AgentService, + Effect.gen(function*() { + const reducer = yield* EventReducer + + const getEvents = ({ agentName }: { readonly agentName: AgentName }) => + fetchJson(`/agent/${agentName}/events/history`).pipe( + Effect.flatMap(parseEventsResponse) + ) + + const addEvents = ({ agentName, events }: { readonly agentName: AgentName; readonly events: ReadonlyArray }) => + postJson(`/agent/${agentName}/events`, { + events: events.map((event) => encodeEvent(event)), + streamUntilIdle: false + }).pipe(Effect.asVoid) + + const endSession = ({ agentName }: { readonly agentName: AgentName }) => + postJson(`/agent/${agentName}/end-session`, undefined).pipe(Effect.asVoid) + + const interruptTurn = ({ agentName }: { readonly agentName: AgentName }) => + postJson(`/agent/${agentName}/interrupt`, undefined).pipe(Effect.asVoid) + + const isIdle = ({ agentName }: { readonly agentName: AgentName }) => + fetchJson(`/agent/${agentName}/idle`).pipe( + Effect.map((data) => + typeof data === "object" && data !== null && "idle" in data ? Boolean((data as { idle: unknown }).idle) : false + ) + ) + + const getState = ({ agentName }: { readonly agentName: AgentName }) => + getEvents({ agentName }).pipe( + Effect.flatMap((events) => reducer.reduce(reducer.initialReducedContext, events)) + ) + + const tapEventStream: AgentServiceApi["tapEventStream"] = ({ agentName }) => + Effect.gen(function*() { + const existingEvents = yield* getEvents({ agentName }) + let lastEventId = existingEvents.length > 0 ? existingEvents[existingEvents.length - 1]!.id : null + + return Stream.asyncScoped((emit) => + Effect.gen(function*() { + let cancelled = false + yield* Effect.addFinalizer(() => Effect.sync(() => { cancelled = true })) + + while (!cancelled) { + yield* Effect.sleep("200 millis") + const events = yield* getEvents({ agentName }) + if (events.length === 0) { + continue + } + if (!lastEventId) { + lastEventId = events[events.length - 1]!.id + continue + } + const lastIndex = events.findIndex((event) => event.id === lastEventId) + const startIndex = lastIndex === -1 ? events.length : lastIndex + 1 + for (let idx = startIndex; idx < events.length; idx++) { + const event = events[idx]! + lastEventId = event.id + yield* emit.single(event) + } + } + }) + ) + }) + + return { + addEvents, + tapEventStream, + getEvents, + getState, + endSession, + interruptTurn, + isIdle + } satisfies AgentServiceApi + }) + ) +} diff --git a/src/agent-service.ts b/src/agent-service.ts new file mode 100644 index 0000000..e2102dc --- /dev/null +++ b/src/agent-service.ts @@ -0,0 +1,79 @@ +import { Clock, Context, Effect, Layer, Scope, Stream } from "effect" +import { AgentRegistry } from "./agent-registry.ts" +import { type AgentName, type ContextEvent, type MiniAgent, type ReducedContext } from "./domain.ts" + +export interface AddEventsInput { + readonly agentName: AgentName + readonly events: ReadonlyArray +} + +export interface TapEventStreamInput { + readonly agentName: AgentName +} + +export interface GetEventsInput { + readonly agentName: AgentName +} + +export interface AgentServiceApi { + readonly addEvents: (input: AddEventsInput) => Effect.Effect + readonly tapEventStream: ( + input: TapEventStreamInput + ) => Effect.Effect, never, Scope.Scope | Clock.Clock> + readonly getEvents: (input: GetEventsInput) => Effect.Effect> + readonly getState: (input: { readonly agentName: AgentName }) => Effect.Effect + readonly endSession: (input: { readonly agentName: AgentName }) => Effect.Effect + readonly interruptTurn: (input: { readonly agentName: AgentName }) => Effect.Effect + readonly isIdle: (input: { readonly agentName: AgentName }) => Effect.Effect +} + +export const AgentService = Context.Tag()("@mini-agent/AgentService") + +export const AgentServiceLive = Layer.effect( + AgentService, + Effect.gen(function*() { + const registry = yield* AgentRegistry + + const withAgent = ( + agentName: AgentName, + useAgent: (agent: MiniAgent) => Effect.Effect + ): Effect.Effect => + Effect.gen(function*() { + const agent = yield* registry.getOrCreate(agentName).pipe(Effect.orDie) + return yield* useAgent(agent) + }) + + const addEvents = ({ agentName, events }: AddEventsInput) => + withAgent(agentName, (agent) => + Effect.forEach(events, (event) => agent.addEvent(event), { discard: true }) + ) + + const tapEventStream: AgentServiceApi["tapEventStream"] = ({ agentName }) => + withAgent(agentName, (agent) => agent.tapEventStream) + + const getEvents = ({ agentName }: GetEventsInput) => + withAgent(agentName, (agent) => agent.getEvents) + + const getState = ({ agentName }: { readonly agentName: AgentName }) => + withAgent(agentName, (agent) => agent.getState) + + const endSession = ({ agentName }: { readonly agentName: AgentName }) => + withAgent(agentName, (agent) => agent.endSession) + + const interruptTurn = ({ agentName }: { readonly agentName: AgentName }) => + withAgent(agentName, (agent) => agent.interruptTurn) + + const isIdle = ({ agentName }: { readonly agentName: AgentName }) => + withAgent(agentName, (agent) => agent.isIdle) + + return { + addEvents, + tapEventStream, + getEvents, + getState, + endSession, + interruptTurn, + isIdle + } satisfies AgentServiceApi + }) +) diff --git a/src/cli/chat-ui.ts b/src/cli/chat-ui.ts index 5f440f3..19a5ebb 100644 --- a/src/cli/chat-ui.ts +++ b/src/cli/chat-ui.ts @@ -5,15 +5,15 @@ * Return during streaming interrupts (with optional new message); Escape exits. */ import { DateTime, Effect, Fiber, Mailbox, Option, Stream } from "effect" -import { AgentRegistry } from "../agent-registry.ts" +import { AgentService } from "../agent-service.ts" import { type AgentName, type ContextSaveError, makeEventId, - type MiniAgent, type ReducerError, UserMessageEvent } from "../domain.ts" +import { deriveContextMetadata } from "./event-context.ts" import { type ChatController, runOpenTUIChat } from "./components/opentui-chat.tsx" type ChatSignal = @@ -22,19 +22,19 @@ type ChatSignal = export class ChatUI extends Effect.Service()("@mini-agent/ChatUI", { effect: Effect.gen(function*() { - const registry = yield* AgentRegistry + const agentService = yield* AgentService - const runChat = Effect.fn("ChatUI.runChat")(function*(contextName: string) { - // Get or create the agent - const agent = yield* registry.getOrCreate(contextName as AgentName) + const runChat = Effect.fn("ChatUI.runChat")(function*(agentNameInput: string) { + const agentName = agentNameInput as AgentName // Get existing events for history display - const existingEvents = yield* agent.getEvents + const existingEvents = yield* agentService.getEvents({ agentName }) + // Unbounded mailbox - unsafeOffer always succeeds (idiomatic Effect pattern) const mailbox = yield* Mailbox.make() const chat = yield* Effect.promise(() => - runOpenTUIChat(contextName, existingEvents, { + runOpenTUIChat(agentNameInput, existingEvents, { onSubmit: (text) => { mailbox.unsafeOffer({ _tag: "Input", text }) }, @@ -45,17 +45,18 @@ export class ChatUI extends Effect.Service()("@mini-agent/ChatUI", { ) // Subscribe to agent events and forward to UI - const subscriptionFiber = yield* agent.events.pipe( + const eventStream = yield* agentService.tapEventStream({ agentName }) + const subscriptionFiber = yield* eventStream.pipe( Stream.runForEach((event) => Effect.sync(() => chat.addEvent(event))), Effect.fork ) - yield* runChatLoop(agent, chat, mailbox).pipe( + yield* runChatLoop(agentName, chat, mailbox, agentService).pipe( Effect.catchAllCause(() => Effect.void), Effect.ensuring( Effect.gen(function*() { yield* Fiber.interrupt(subscriptionFiber) - yield* agent.endSession + yield* agentService.endSession({ agentName }) chat.cleanup() }) ) @@ -64,17 +65,18 @@ export class ChatUI extends Effect.Service()("@mini-agent/ChatUI", { return { runChat } }), - dependencies: [AgentRegistry.Default] + dependencies: [AgentService.Default] }) {} const runChatLoop = ( - agent: MiniAgent, + agentName: AgentName, chat: ChatController, - mailbox: Mailbox.Mailbox + mailbox: Mailbox.Mailbox, + agentService: AgentService ): Effect.Effect => Effect.fn("ChatUI.runChatLoop")(function*() { while (true) { - const result = yield* runChatTurn(agent, chat, mailbox) + const result = yield* runChatTurn(agentName, chat, mailbox, agentService) if (result._tag === "exit") { return } @@ -86,9 +88,10 @@ type TurnResult = | { readonly _tag: "exit" } const runChatTurn = ( - agent: MiniAgent, + agentName: AgentName, chat: ChatController, - mailbox: Mailbox.Mailbox + mailbox: Mailbox.Mailbox, + agentService: AgentService ): Effect.Effect => Effect.fn("ChatUI.runChatTurn")(function*() { const signal = yield* mailbox.take.pipe( @@ -106,24 +109,24 @@ const runChatTurn = ( return { _tag: "continue" } as const } - // Get current context to build proper event - const ctx = yield* agent.getReducedContext + const events = yield* agentService.getEvents({ agentName }) + const { contextName, nextEventNumber } = deriveContextMetadata(agentName, events) // Create user event with triggersAgentTurn=true to start LLM turn const userEvent = new UserMessageEvent({ - id: makeEventId(agent.contextName, ctx.nextEventNumber), + id: makeEventId(contextName, nextEventNumber), timestamp: DateTime.unsafeNow(), - agentName: agent.agentName, + agentName, parentEventId: Option.none(), triggersAgentTurn: true, content: userMessage }) // Add event to agent - this will broadcast to subscription and trigger LLM turn - yield* agent.addEvent(userEvent) + yield* agentService.addEvents({ agentName, events: [userEvent] }) // Wait for turn to complete or user interrupt - const result = yield* awaitTurnCompletion(agent, mailbox) + const result = yield* awaitTurnCompletion(agentName, mailbox, agentService) if (result._tag === "exit") { return { _tag: "exit" } as const @@ -131,12 +134,9 @@ const runChatTurn = ( if (result._tag === "interrupted") { if (result.newMessage) { - // User sent new message during streaming - this will trigger a new turn - // The agent's debounce processing will interrupt the current turn automatically - return yield* runChatTurnWithPending(agent, chat, mailbox, result.newMessage) + return yield* runChatTurnWithPending(agentName, chat, mailbox, result.newMessage, agentService) } else { - // User hit return with no text - just interrupt without starting new turn - yield* agent.interruptTurn + yield* agentService.interruptTurn({ agentName }) } } @@ -144,33 +144,35 @@ const runChatTurn = ( })() const runChatTurnWithPending = ( - agent: MiniAgent, + agentName: AgentName, chat: ChatController, mailbox: Mailbox.Mailbox, - pendingMessage: string + pendingMessage: string, + agentService: AgentService ): Effect.Effect => Effect.gen(function*() { - const ctx = yield* agent.getReducedContext + const events = yield* agentService.getEvents({ agentName }) + const { contextName, nextEventNumber } = deriveContextMetadata(agentName, events) const userEvent = new UserMessageEvent({ - id: makeEventId(agent.contextName, ctx.nextEventNumber), + id: makeEventId(contextName, nextEventNumber), timestamp: DateTime.unsafeNow(), - agentName: agent.agentName, + agentName, parentEventId: Option.none(), triggersAgentTurn: true, content: pendingMessage }) - yield* agent.addEvent(userEvent) + yield* agentService.addEvents({ agentName, events: [userEvent] }) - const result = yield* awaitTurnCompletion(agent, mailbox) + const result = yield* awaitTurnCompletion(agentName, mailbox, agentService) if (result._tag === "exit") { return { _tag: "exit" } as const } if (result._tag === "interrupted" && result.newMessage) { - return yield* runChatTurnWithPending(agent, chat, mailbox, result.newMessage) + return yield* runChatTurnWithPending(agentName, chat, mailbox, result.newMessage, agentService) } return { _tag: "continue" } as const @@ -182,15 +184,16 @@ type TurnCompletionResult = | { readonly _tag: "interrupted"; readonly newMessage: string | null } const awaitTurnCompletion = ( - agent: MiniAgent, - mailbox: Mailbox.Mailbox + agentName: AgentName, + mailbox: Mailbox.Mailbox, + agentService: AgentService ): Effect.Effect => Effect.fn("ChatUI.awaitTurnCompletion")(function*() { // Wait for either: turn completes OR user interrupts const waitForIdle = Effect.gen(function*() { // Poll for idle state while (true) { - const isIdle = yield* agent.isIdle + const isIdle = yield* agentService.isIdle({ agentName }) if (isIdle) { return { _tag: "completed" } as TurnCompletionResult } diff --git a/src/cli/commands.ts b/src/cli/commands.ts index 842228f..d6e6518 100644 --- a/src/cli/commands.ts +++ b/src/cli/commands.ts @@ -8,7 +8,8 @@ import { Command, Options, Prompt as CliPrompt } from "@effect/cli" import { type Error as PlatformError, FileSystem, HttpServer, type Terminal } from "@effect/platform" import { BunHttpServer, BunStream } from "@effect/platform-bun" import { Chunk, Console, DateTime, Effect, Fiber, Layer, Option, Schema, Stream } from "effect" -import { AgentRegistry } from "../agent-registry.ts" +import { AgentService } from "../agent-service.ts" +import { makeHttpAgentServiceLayer } from "../agent-service-http.ts" import { AppConfig, resolveBaseDir } from "../config.ts" import { type AgentName, @@ -22,6 +23,7 @@ import { EventStore } from "../event-store.ts" import { makeRouter } from "../http-routes.ts" import { layercodeCommand } from "../layercode/index.ts" import { printTraceLinks } from "../tracing.ts" +import { deriveContextMetadata } from "./event-context.ts" const encodeEvent = Schema.encodeSync(ContextEvent) @@ -80,6 +82,11 @@ const showEphemeralOption = Options.boolean("show-ephemeral").pipe( Options.withDefault(false) ) +const remoteBaseUrlOption = Options.text("remote-base-url").pipe( + Options.withDescription("Connect to a remote mini-agent server (overrides local agent)"), + Options.optional +) + const scriptOption = Options.boolean("script").pipe( Options.withAlias("s"), Options.withDescription("Script mode: read JSONL events from stdin, output JSONL events"), @@ -125,29 +132,27 @@ const handleEvent = ( /** Run the event stream, handling each event */ const runEventStream = ( - contextName: string, + agentName: AgentName, userMessage: string, options: OutputOptions, images: ReadonlyArray = [] ) => Effect.gen(function*() { - const registry = yield* AgentRegistry - const agent = yield* registry.getOrCreate(contextName as AgentName) + const agentService = yield* AgentService // Get existing events first (includes SessionStartedEvent emitted during agent creation) - const existingEvents = yield* agent.getEvents + const existingEvents = yield* agentService.getEvents({ agentName }) for (const event of existingEvents) { yield* handleEvent(event, options) } - // Get current context to build proper event - const ctx = yield* agent.getReducedContext + const { contextName, nextEventNumber } = deriveContextMetadata(agentName, existingEvents) // Create user event with triggersAgentTurn=true const userEvent = new UserMessageEvent({ - id: makeEventId(agent.contextName, ctx.nextEventNumber), + id: makeEventId(contextName, nextEventNumber), timestamp: DateTime.unsafeNow(), - agentName: agent.agentName, + agentName, parentEventId: Option.none(), triggersAgentTurn: true, content: userMessage, @@ -155,27 +160,29 @@ const runEventStream = ( }) // Subscribe to events - wait for turn completion first - const turnFiber = yield* agent.events.pipe( + const liveStream = yield* agentService.tapEventStream({ agentName }) + const turnFiber = yield* liveStream.pipe( Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), Stream.runForEach((event) => handleEvent(event, options)), Effect.fork ) // Add event to agent - triggers LLM turn - yield* agent.addEvent(userEvent) + yield* agentService.addEvents({ agentName, events: [userEvent] }) // Wait for turn to complete yield* Fiber.join(turnFiber).pipe(Effect.catchAllCause(() => Effect.void)) // Subscribe to capture SessionEndedEvent - const sessionEndFiber = yield* agent.events.pipe( + const sessionEndStream = yield* agentService.tapEventStream({ agentName }) + const sessionEndFiber = yield* sessionEndStream.pipe( Stream.takeUntil((e) => e._tag === "SessionEndedEvent"), Stream.runForEach((event) => handleEvent(event, options)), Effect.fork ) // End session (emits SessionEndedEvent) - yield* agent.endSession + yield* agentService.endSession({ agentName }) // Wait for SessionEndedEvent yield* Fiber.join(sessionEndFiber).pipe(Effect.catchAllCause(() => Effect.void)) @@ -200,6 +207,7 @@ const determineMode = (options: { const utf8Decoder = new TextDecoder("utf-8") + const readAllStdin: Effect.Effect = BunStream.stdin.pipe( Stream.mapChunks(Chunk.map((bytes) => utf8Decoder.decode(bytes))), Stream.runCollect, @@ -229,13 +237,12 @@ const stdinEvents = BunStream.stdin.pipe( ) ) -const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => +const scriptInteractiveLoop = (agentName: AgentName, options: OutputOptions) => Effect.gen(function*() { - const registry = yield* AgentRegistry - const agent = yield* registry.getOrCreate(contextName as AgentName) + const agentService = yield* AgentService // Output existing events first (includes SessionStartedEvent) - const existingEvents = yield* agent.getEvents + const existingEvents = yield* agentService.getEvents({ agentName }) for (const event of existingEvents) { yield* handleEvent(event, options) } @@ -248,27 +255,29 @@ const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => const isUserMessage = inputMsg._tag === "UserMessage" || inputMsg._tag === "UserMessageEvent" if (isUserMessage) { - // Get current context - const ctx = yield* agent.getReducedContext + // Get current context metadata + const currentEvents = yield* agentService.getEvents({ agentName }) + const { contextName, nextEventNumber } = deriveContextMetadata(agentName, currentEvents) // Create proper event with triggersAgentTurn const userEvent = new UserMessageEvent({ - id: makeEventId(agent.contextName, ctx.nextEventNumber), + id: makeEventId(contextName, nextEventNumber), timestamp: DateTime.unsafeNow(), - agentName: agent.agentName, + agentName, parentEventId: Option.none(), triggersAgentTurn: true, content: inputMsg.content }) // Subscribe to events - wait for turn completion to process next message - const eventFiber = yield* agent.events.pipe( + const inputStream = yield* agentService.tapEventStream({ agentName }) + const eventFiber = yield* inputStream.pipe( Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), Stream.runForEach((outputEvent) => handleEvent(outputEvent, options)), Effect.fork ) - yield* agent.addEvent(userEvent) + yield* agentService.addEvents({ agentName, events: [userEvent] }) yield* Fiber.join(eventFiber).pipe(Effect.catchAllCause(() => Effect.void)) } else { yield* Effect.logDebug("SystemPrompt events in script mode are echoed but not persisted") @@ -279,13 +288,14 @@ const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => ) // Subscribe to capture SessionEndedEvent - const sessionEndFiber = yield* agent.events.pipe( + const scriptSessionStream = yield* agentService.tapEventStream({ agentName }) + const sessionEndFiber = yield* scriptSessionStream.pipe( Stream.takeUntil((e) => e._tag === "SessionEndedEvent"), Stream.runForEach((event) => handleEvent(event, options)), Effect.fork ) - yield* agent.endSession + yield* agentService.endSession({ agentName }) yield* Fiber.join(sessionEndFiber).pipe(Effect.catchAllCause(() => Effect.void)) }) @@ -329,7 +339,7 @@ const selectOrCreateContext = Effect.gen(function*() { return selected }) -const generateRandomContextName = (): string => { +const generateRandomAgentName = (): string => { const chars = "abcdefghijklmnopqrstuvwxyz0123456789" const suffix = Array.from({ length: 5 }, () => chars[Math.floor(Math.random() * chars.length)]).join("") return `chat-${suffix}` @@ -371,11 +381,14 @@ const runChat = (options: { script: boolean showEphemeral: boolean images: ReadonlyArray + remoteBaseUrl: Option.Option }) => Effect.gen(function*() { yield* Effect.logDebug("Starting chat session") const mode = determineMode(options) - const contextName = Option.getOrElse(options.name, generateRandomContextName) + const agentNameString = Option.getOrElse(options.name, generateRandomAgentName) + const agentName = agentNameString as AgentName + const remoteBaseUrl = options.remoteBaseUrl const outputOptions: OutputOptions = { raw: mode === "script" || options.raw, @@ -390,42 +403,68 @@ const runChat = (options: { switch (mode) { case "single-turn": { const message = Option.getOrElse(options.message, () => "") - yield* runEventStream(contextName, message, outputOptions, imageDataUris) - if (!outputOptions.raw) { - yield* printTraceLinks - } + const singleTurnEffect = runEventStream(agentName, message, outputOptions, imageDataUris) + const tracedEffect = outputOptions.raw + ? singleTurnEffect + : singleTurnEffect.pipe( + Effect.ensuring( + printTraceLinks.pipe(Effect.catchAll(() => Effect.void)) + ) + ) + yield* tracedEffect break } case "pipe": { const input = yield* readAllStdin if (input !== "") { - yield* runEventStream(contextName, input, { raw: false, showEphemeral: false }, imageDataUris) + const pipeEffect = runEventStream(agentName, input, { raw: false, showEphemeral: false }, imageDataUris) + yield* pipeEffect.pipe( + Effect.ensuring( + printTraceLinks.pipe(Effect.catchAll(() => Effect.void)) + ) + ) } break } case "script": { - yield* scriptInteractiveLoop(contextName, outputOptions) + yield* scriptInteractiveLoop(agentName, outputOptions) break } case "tty-interactive": { const resolvedName = Option.isSome(options.name) - ? contextName - : yield* selectOrCreateContext + ? agentNameString + : Option.isSome(remoteBaseUrl) + ? yield* CliPrompt.text({ message: "Enter a name for your remote conversation" }) + : yield* selectOrCreateContext const { ChatUI } = yield* Effect.promise(() => import("./chat-ui.ts")) const chatUI = yield* ChatUI - yield* chatUI.runChat(resolvedName).pipe( - Effect.catchAllCause(() => Effect.void), - Effect.ensuring(printTraceLinks.pipe(Effect.flatMap(() => Console.log("\nGoodbye!")))) + const ttyEffect = chatUI.runChat(resolvedName).pipe( + Effect.ensuring( + Effect.all( + [ + printTraceLinks.pipe(Effect.catchAll(() => Effect.void)), + Console.log("\nGoodbye!") + ], + { discard: true } + ) + ) ) + + yield* ttyEffect break } } }).pipe( + (effect) => + Option.match(options.remoteBaseUrl, { + onNone: () => effect, + onSome: (url) => effect.pipe(Effect.provide(makeHttpAgentServiceLayer(url))) + }), Effect.provide(makeChatUILayer()), Effect.withSpan("chat-session") ) @@ -499,9 +538,11 @@ const chatCommand = Command.make( script: scriptOption, showEphemeral: showEphemeralOption, images: imageOption + , + remoteBaseUrl: remoteBaseUrlOption }, - ({ images, message, name, raw, script, showEphemeral }) => - runChat({ images, message, name, raw, script, showEphemeral }) + ({ images, message, name, raw, remoteBaseUrl, script, showEphemeral }) => + runChat({ images, message, name, raw, remoteBaseUrl, script, showEphemeral }) ).pipe(Command.withDescription("Chat with an AI assistant using persistent context history")) const logTestCommand = Command.make( @@ -576,13 +617,26 @@ export const serveCommand = Command.make( yield* Console.log("Endpoints:") yield* Console.log(" POST /agent/:agentName") yield* Console.log(" Send user message, receive SSE stream of events") + yield* Console.log(" POST /agent/:agentName/events") + yield* Console.log(" Append events; optional streamUntilIdle SSE response") + yield* Console.log(" POST /agent/:agentName/end-session") + yield* Console.log(" Gracefully end the agent session") + yield* Console.log(" POST /agent/:agentName/interrupt") + yield* Console.log(" Interrupt the current LLM turn if one is running") yield* Console.log("") yield* Console.log(" GET /agent/:agentName/events") yield* Console.log(" Subscribe to agent event stream (SSE)") + yield* Console.log(" GET /agent/:agentName/events/live") + yield* Console.log(" Subscribe to live-only event stream (SSE)") + yield* Console.log(" GET /agent/:agentName/events/history") + yield* Console.log(" Get JSON snapshot of all events") yield* Console.log("") yield* Console.log(" GET /agent/:agentName/state") yield* Console.log(" Get current agent state") yield* Console.log("") + yield* Console.log(" GET /agent/:agentName/idle") + yield* Console.log(" Check if the agent is currently idle") + yield* Console.log("") yield* Console.log(" GET /health") yield* Console.log(" Health check endpoint") yield* Console.log("") diff --git a/src/cli/components/opentui-chat.tsx b/src/cli/components/opentui-chat.tsx index 54c703b..41ad0a0 100644 --- a/src/cli/components/opentui-chat.tsx +++ b/src/cli/components/opentui-chat.tsx @@ -541,13 +541,13 @@ export interface ChatController { } interface ChatAppProps { - contextName: string + agentName: string initialEvents: ReadonlyArray callbacks: ChatCallbacks controllerRef: React.MutableRefObject } -function ChatApp({ callbacks, contextName, controllerRef, initialEvents }: ChatAppProps) { +function ChatApp({ agentName, callbacks, controllerRef, initialEvents }: ChatAppProps) { // Derive initial feed items from initial events (runs once on mount) const initialFeedItems = useMemo( () => @@ -628,7 +628,7 @@ function ChatApp({ callbacks, contextName, controllerRef, initialEvents }: ChatA - Agent: {contextName} · + Agent: {agentName} · {isStreaming ? " Return to interrupt" : " Ctrl+C to exit"} @@ -637,7 +637,7 @@ function ChatApp({ callbacks, contextName, controllerRef, initialEvents }: ChatA } export async function runOpenTUIChat( - contextName: string, + agentName: string, initialEvents: ReadonlyArray, callbacks: ChatCallbacks ): Promise { @@ -669,7 +669,7 @@ export async function runOpenTUIChat( root.render( +): EventContextMetadata => { + const fallbackContext = `${agentName}${DEFAULT_CONTEXT_SUFFIX}` as ContextName + if (events.length === 0) { + return { contextName: fallbackContext, nextEventNumber: 0 } + } + + const lastEvent = events[events.length - 1]! + const segments = lastEvent.id.split(":") + const counterRaw = segments.pop() ?? "0" + const contextSegment = segments.join(":") + const parsedCounter = Number.parseInt(counterRaw, 10) + + return { + contextName: (contextSegment || fallbackContext) as ContextName, + nextEventNumber: Number.isNaN(parsedCounter) ? 0 : parsedCounter + 1 + } +} diff --git a/src/cli/main.ts b/src/cli/main.ts index 3f7363a..3af8cbf 100644 --- a/src/cli/main.ts +++ b/src/cli/main.ts @@ -8,6 +8,7 @@ import { FetchHttpClient } from "@effect/platform" import { BunContext, BunRuntime } from "@effect/platform-bun" import { Cause, Effect, Layer } from "effect" import { AgentRegistry } from "../agent-registry.ts" +import { AgentService } from "../agent-service.ts" import { AppConfig, extractConfigPath, @@ -100,7 +101,7 @@ const makeMainLayer = (args: ReadonlyArray) => const tracingLayer = createTracingLayer("mini-agent") // AgentRegistry.Default requires EventStore, EventReducer, and MiniAgentTurn - return AgentRegistry.Default.pipe( + const registryLayer = AgentRegistry.Default.pipe( Layer.provideMerge(LlmTurnLive), Layer.provideMerge(languageModelLayer), Layer.provideMerge(llmConfigLayer), @@ -112,6 +113,10 @@ const makeMainLayer = (args: ReadonlyArray) => Layer.provideMerge(loggingLayer), Layer.provideMerge(BunContext.layer) ) + const agentServiceLayer = AgentService.Default.pipe( + Layer.provide(registryLayer) + ) + return Layer.mergeAll(registryLayer, agentServiceLayer) }) return Layer.unwrapEffect(buildLayers.pipe(Effect.provide(loggingLayer))) diff --git a/src/domain.ts b/src/domain.ts index d405904..8fde7cf 100644 --- a/src/domain.ts +++ b/src/domain.ts @@ -284,16 +284,13 @@ export interface MiniAgent { readonly addEvent: (event: ContextEvent) => Effect.Effect /** * Subscribe to live events. Returns an Effect that, when it completes, - * guarantees the subscription is established. Use this instead of `events` - * when you need to ensure you don't miss events added immediately after subscribing. + * guarantees the subscription is established. * * The returned stream is scoped to the caller's scope. */ - readonly subscribe: Effect.Effect, never, Scope.Scope> - /** @deprecated Use subscribe instead - events stream has race condition on subscription timing */ - readonly events: Stream.Stream + readonly tapEventStream: Effect.Effect, never, Scope.Scope> readonly getEvents: Effect.Effect> - readonly getReducedContext: Effect.Effect + readonly getState: Effect.Effect /** Gracefully end session: emit SessionEndedEvent (with AgentTurnInterruptedEvent if mid-turn), then close mailbox */ readonly endSession: Effect.Effect /** True when no LLM turn is in progress */ diff --git a/src/http-routes.ts b/src/http-routes.ts index 7e7060c..926fb39 100644 --- a/src/http-routes.ts +++ b/src/http-routes.ts @@ -10,8 +10,9 @@ import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" import { Chunk, Effect, Fiber, Schema, Stream } from "effect" -import { AgentRegistry } from "./agent-registry.ts" +import { AgentService } from "./agent-service.ts" import { type AgentName, ContextEvent, makeBaseEventFields, UserMessageEvent } from "./domain.ts" +import { deriveContextMetadata } from "./cli/event-context.ts" const encodeEvent = Schema.encodeSync(ContextEvent) @@ -26,6 +27,12 @@ const InputMessage = Schema.Struct({ }) type InputMessage = typeof InputMessage.Type +const AddEventsBody = Schema.Struct({ + events: Schema.Array(ContextEvent), + streamUntilIdle: Schema.optional(Schema.Boolean) +}) +type AddEventsBody = typeof AddEventsBody.Type + /** Parse JSON body into InputMessage */ const parseBody = (body: string) => Effect.gen(function*() { @@ -36,16 +43,38 @@ const parseBody = (body: string) => return yield* Schema.decodeUnknown(InputMessage)(json) }) +const shouldStopTurn = (event: ContextEvent): boolean => + event._tag === "AgentTurnCompletedEvent" || + event._tag === "AgentTurnFailedEvent" || + event._tag === "AgentTurnInterruptedEvent" + +const waitForIdleStable = (agentService: AgentService, agentName: AgentName, duration = 50) => + Effect.gen(function*() { + while (true) { + const idle = yield* agentService.isIdle({ agentName }) + if (idle) { + yield* Effect.sleep(`${duration} millis`) + const stillIdle = yield* agentService.isIdle({ agentName }) + if (stillIdle) { + return + } + } else { + yield* Effect.sleep("25 millis") + } + } + }) + /** Handler for POST /agent/:agentName */ const agentHandler = Effect.gen(function*() { const request = yield* HttpServerRequest.HttpServerRequest - const registry = yield* AgentRegistry + const agentService = yield* AgentService const params = yield* HttpRouter.params - const agentName = params.agentName - if (!agentName) { + const agentNameParam = params.agentName + if (!agentNameParam) { return HttpServerResponse.text("Missing agentName", { status: 400 }) } + const agentName = agentNameParam as AgentName yield* Effect.logDebug("POST /agent/:agentName", { agentName }) @@ -65,22 +94,19 @@ const agentHandler = Effect.gen(function*() { const message = parseResult.right // Get or create agent - const agent = yield* registry.getOrCreate(agentName as AgentName) - // Get existing events to include initial session events - const existingEvents = yield* agent.getEvents - - const ctx = yield* agent.getReducedContext + const existingEvents = yield* agentService.getEvents({ agentName }) + const { contextName, nextEventNumber } = deriveContextMetadata(agentName, existingEvents) // Prepare user event const userEvent = new UserMessageEvent({ - ...makeBaseEventFields(agentName as AgentName, agent.contextName, ctx.nextEventNumber, true), + ...makeBaseEventFields(agentName, contextName, nextEventNumber, true), content: message.content }) // Subscribe BEFORE adding event to guarantee we catch all events // PubSub.subscribe guarantees subscription is established when this completes - const liveEvents = yield* agent.subscribe + const liveEvents = yield* agentService.tapEventStream({ agentName }) // Fork collection before adding event const eventFiber = yield* liveEvents.pipe( @@ -93,7 +119,7 @@ const agentHandler = Effect.gen(function*() { Effect.fork ) - yield* agent.addEvent(userEvent) + yield* agentService.addEvents({ agentName, events: [userEvent] }) // Wait for the turn to complete and get all new events const newEventsChunk = yield* Fiber.join(eventFiber).pipe( @@ -118,24 +144,23 @@ const agentHandler = Effect.gen(function*() { /** Handler for GET /agent/:agentName/events - Subscribe to agent event stream */ const agentEventsHandler = Effect.gen(function*() { - const registry = yield* AgentRegistry + const agentService = yield* AgentService const params = yield* HttpRouter.params - const agentName = params.agentName - if (!agentName) { + const agentNameParam = params.agentName + if (!agentNameParam) { return HttpServerResponse.text("Missing agentName", { status: 400 }) } + const agentName = agentNameParam as AgentName yield* Effect.logDebug("GET /agent/:agentName/events", { agentName }) - const agent = yield* registry.getOrCreate(agentName as AgentName) - // Subscribe to live events FIRST to guarantee we don't miss any // PubSub.subscribe guarantees subscription is established when this completes - const liveEvents = yield* agent.subscribe + const liveEvents = yield* agentService.tapEventStream({ agentName }) // Get existing events (captured at subscription time) - const existingEvents = yield* agent.getEvents + const existingEvents = yield* agentService.getEvents({ agentName }) const existingStream = Stream.fromIterable(existingEvents) // Stream terminates when SessionEndedEvent is received @@ -153,24 +178,204 @@ const agentEventsHandler = Effect.gen(function*() { }) }) +/** Handler for GET /agent/:agentName/events/live - Subscribe to live-only stream */ +const agentEventsLiveHandler = Effect.gen(function*() { + const agentService = yield* AgentService + const params = yield* HttpRouter.params + + const agentNameParam = params.agentName + if (!agentNameParam) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) + } + const agentName = agentNameParam as AgentName + + yield* Effect.logDebug("GET /agent/:agentName/events/live", { agentName }) + + const liveEvents = yield* agentService.tapEventStream({ agentName }) + const sseStream = liveEvents.pipe(Stream.map(encodeSSE)) + + return HttpServerResponse.stream(sseStream, { + contentType: "text/event-stream", + headers: { + "Cache-Control": "no-cache", + "Connection": "keep-alive" + } + }) +}) + +/** Handler for GET /agent/:agentName/history - JSON snapshot of events */ +const agentHistoryHandler = Effect.gen(function*() { + const agentService = yield* AgentService + const params = yield* HttpRouter.params + + const agentNameParam = params.agentName + if (!agentNameParam) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) + } + const agentName = agentNameParam as AgentName + + yield* Effect.logDebug("GET /agent/:agentName/history", { agentName }) + + const events = yield* agentService.getEvents({ agentName }) + const encoded = events.map(encodeEvent) + + return yield* HttpServerResponse.json({ + agentName: agentNameParam, + events: encoded + }) +}) + +/** Handler for POST /agent/:agentName/events - Add events, optional idle streaming */ +const addEventsHandler = Effect.gen(function*() { + const request = yield* HttpServerRequest.HttpServerRequest + const agentService = yield* AgentService + const params = yield* HttpRouter.params + + const agentNameParam = params.agentName + if (!agentNameParam) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) + } + const agentName = agentNameParam as AgentName + + const bodyText = yield* request.text + if (bodyText.trim() === "") { + return HttpServerResponse.text("Empty request body", { status: 400 }) + } + + const parsedJson = yield* Effect.try({ + try: () => JSON.parse(bodyText) as unknown, + catch: (e) => new Error(`Invalid JSON: ${e instanceof Error ? e.message : String(e)}`) + }).pipe(Effect.either) + + if (parsedJson._tag === "Left") { + return HttpServerResponse.text(parsedJson.left.message, { status: 400 }) + } + + const parsedBodyResult = yield* Schema.decodeUnknown(AddEventsBody)(parsedJson.right).pipe(Effect.either) + if (parsedBodyResult._tag === "Left") { + return HttpServerResponse.text("Invalid events payload", { status: 400 }) + } + + const { events, streamUntilIdle = false } = parsedBodyResult.right + const triggersTurn = events.some((event) => event.triggersAgentTurn) + + yield* Effect.logDebug("POST /agent/:agentName/events", { + agentName, + eventCount: events.length, + streamUntilIdle + }) + + let newEvents: Array = [] + + if (streamUntilIdle && triggersTurn) { + const liveEvents = yield* agentService.tapEventStream({ agentName }) + const eventFiber = yield* liveEvents.pipe( + Stream.takeUntil(shouldStopTurn), + Stream.runCollect, + Effect.fork + ) + + yield* agentService.addEvents({ agentName, events }) + const collected = yield* Fiber.join(eventFiber).pipe( + Effect.catchAll(() => Effect.succeed(Chunk.empty())) + ) + newEvents = Chunk.toArray(collected) + yield* waitForIdleStable(agentService, agentName) + + const sseStream = Stream.fromIterable([...events, ...newEvents]).pipe( + Stream.map(encodeSSE) + ) + + return HttpServerResponse.stream(sseStream, { + contentType: "text/event-stream", + headers: { + "Cache-Control": "no-cache", + "Connection": "keep-alive" + } + }) + } + + yield* agentService.addEvents({ agentName, events }) + + if (streamUntilIdle && !triggersTurn) { + const sseStream = Stream.fromIterable(events).pipe(Stream.map(encodeSSE)) + return HttpServerResponse.stream(sseStream, { + contentType: "text/event-stream", + headers: { + "Cache-Control": "no-cache", + "Connection": "keep-alive" + } + }) + } + + return HttpServerResponse.json({ status: "ok" }) +}) + +/** Handler for POST /agent/:agentName/end-session */ +const endSessionHandler = Effect.gen(function*() { + const agentService = yield* AgentService + const params = yield* HttpRouter.params + + const agentNameParam = params.agentName + if (!agentNameParam) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) + } + const agentName = agentNameParam as AgentName + + yield* agentService.endSession({ agentName }) + return HttpServerResponse.json({ status: "ended" }) +}) + +/** Handler for POST /agent/:agentName/interrupt */ +const interruptHandler = Effect.gen(function*() { + const agentService = yield* AgentService + const params = yield* HttpRouter.params + + const agentNameParam = params.agentName + if (!agentNameParam) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) + } + const agentName = agentNameParam as AgentName + + yield* agentService.interruptTurn({ agentName }) + return HttpServerResponse.json({ status: "interrupted" }) +}) + +/** Handler for GET /agent/:agentName/idle */ +const idleStatusHandler = Effect.gen(function*() { + const agentService = yield* AgentService + const params = yield* HttpRouter.params + + const agentNameParam = params.agentName + if (!agentNameParam) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) + } + const agentName = agentNameParam as AgentName + + const idle = yield* agentService.isIdle({ agentName }) + return yield* HttpServerResponse.json({ agentName: agentNameParam, idle }) +}) + /** Handler for GET /agent/:agentName/state - Get reduced agent state */ const agentStateHandler = Effect.gen(function*() { - const registry = yield* AgentRegistry + const agentService = yield* AgentService const params = yield* HttpRouter.params - const agentName = params.agentName - if (!agentName) { + const agentNameParam = params.agentName + if (!agentNameParam) { return HttpServerResponse.text("Missing agentName", { status: 400 }) } + const agentName = agentNameParam as AgentName yield* Effect.logDebug("GET /agent/:agentName/state", { agentName }) - const agent = yield* registry.getOrCreate(agentName as AgentName) - const reducedContext = yield* agent.getReducedContext + const reducedContext = yield* agentService.getState({ agentName }) + const events = yield* agentService.getEvents({ agentName }) + const { contextName } = deriveContextMetadata(agentName, events) return yield* HttpServerResponse.json({ - agentName, - contextName: agent.contextName, + agentName: agentNameParam, + contextName, nextEventNumber: reducedContext.nextEventNumber, currentTurnNumber: reducedContext.currentTurnNumber, messageCount: reducedContext.messages.length, @@ -188,7 +393,13 @@ const healthHandler = Effect.gen(function*() { /** HTTP router */ export const makeRouter = HttpRouter.empty.pipe( HttpRouter.post("/agent/:agentName", agentHandler), + HttpRouter.post("/agent/:agentName/events", addEventsHandler), + HttpRouter.post("/agent/:agentName/end-session", endSessionHandler), + HttpRouter.post("/agent/:agentName/interrupt", interruptHandler), HttpRouter.get("/agent/:agentName/events", agentEventsHandler), + HttpRouter.get("/agent/:agentName/events/live", agentEventsLiveHandler), + HttpRouter.get("/agent/:agentName/events/history", agentHistoryHandler), HttpRouter.get("/agent/:agentName/state", agentStateHandler), + HttpRouter.get("/agent/:agentName/idle", idleStatusHandler), HttpRouter.get("/health", healthHandler) ) diff --git a/src/layercode/layercode.adapter.ts b/src/layercode/layercode.adapter.ts index 74baf3a..bbbbf45 100644 --- a/src/layercode/layercode.adapter.ts +++ b/src/layercode/layercode.adapter.ts @@ -16,16 +16,16 @@ */ import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" import { Chunk, Effect, Fiber, Option, Schema, Stream } from "effect" -import { AgentRegistry } from "../agent-registry.ts" +import { AgentService } from "../agent-service.ts" import { AppConfig } from "../config.ts" import { type AgentName, type ContextEvent, - type ContextName, makeBaseEventFields, type TextDeltaEvent, UserMessageEvent } from "../domain.ts" +import { deriveContextMetadata } from "../cli/event-context.ts" import { maybeVerifySignature } from "./signature.ts" /** LayerCode incoming webhook event types */ @@ -118,7 +118,7 @@ const toLayerCodeResponse = ( const layercodeWebhookHandler = (welcomeMessage: Option.Option) => Effect.gen(function*() { const request = yield* HttpServerRequest.HttpServerRequest - const registry = yield* AgentRegistry + const agentService = yield* AgentService const config = yield* AppConfig yield* Effect.logDebug("POST /layercode/webhook") @@ -162,19 +162,18 @@ const layercodeWebhookHandler = (welcomeMessage: Option.Option) => switch (webhookEvent.type) { case "message": { const agentName = sessionToAgentName(webhookEvent.session_id) - const contextName = `${agentName}-v1` as ContextName const turnId = webhookEvent.turn_id - const agent = yield* registry.getOrCreate(agentName) - const ctx = yield* agent.getReducedContext + const existingEvents = yield* agentService.getEvents({ agentName }) + const { contextName, nextEventNumber } = deriveContextMetadata(agentName, existingEvents) const userEvent = new UserMessageEvent({ - ...makeBaseEventFields(agentName, contextName, ctx.nextEventNumber, true), + ...makeBaseEventFields(agentName, contextName, nextEventNumber, true), content: webhookEvent.text }) // Subscribe to events - subscription is guaranteed established when this completes - const eventStream = yield* agent.subscribe + const eventStream = yield* agentService.tapEventStream({ agentName }) const eventFiber = yield* eventStream.pipe( Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), Stream.runCollect, @@ -182,7 +181,7 @@ const layercodeWebhookHandler = (welcomeMessage: Option.Option) => ) // Add the user event to trigger the turn (no delay needed - subscription is guaranteed) - yield* agent.addEvent(userEvent) + yield* agentService.addEvents({ agentName, events: [userEvent] }) // Wait for the turn to complete and get all new events const newEventsChunk = yield* Fiber.join(eventFiber).pipe( @@ -314,7 +313,7 @@ export const makeLayerCodeRouter = ( welcomeMessage: Option.Option ): HttpRouter.HttpRouter< never, - | AgentRegistry + | AgentService | AppConfig > => HttpRouter.empty.pipe( diff --git a/src/mini-agent.ts b/src/mini-agent.ts index e98329c..9c582b5 100644 --- a/src/mini-agent.ts +++ b/src/mini-agent.ts @@ -381,7 +381,6 @@ export const makeMiniAgent = ( // Now signal completion to all subscribers: yield* PubSub.shutdown(pubsub) // Signal to PubSub subscribers that stream is done - yield* mailbox.end // Signal to deprecated .events subscribers }) // Internal shutdown for scope cleanup - bypasses queue to avoid deadlock @@ -422,12 +421,12 @@ export const makeMiniAgent = ( // Signal completion to all subscribers yield* PubSub.shutdown(pubsub).pipe(Effect.catchAll(() => Effect.void)) - yield* mailbox.end }) // Try to get config and LLM config (optional - may not be available in tests) const appConfigOption = yield* Effect.serviceOption(AppConfig) const llmConfigOption = yield* Effect.serviceOption(CurrentLlmConfig) + const isFirstSession = existingEvents.length === 0 // Emit session started (use current nextEventNumber to avoid collision with loaded events) const state = yield* Ref.get(stateRef) @@ -440,8 +439,8 @@ export const makeMiniAgent = ( }) yield* addEventInternal(sessionStartEvent) - // Emit LLM config event if available - if (Option.isSome(llmConfigOption)) { + // Emit LLM config event on first-ever session + if (isFirstSession && Option.isSome(llmConfigOption)) { const llmConfig = llmConfigOption.value const stateAfterLlm = yield* Ref.get(stateRef) const llmConfigEvent = new SetLlmConfigEvent({ @@ -457,9 +456,10 @@ export const makeMiniAgent = ( }) yield* addEventInternal(llmConfigEvent) } + // TODO: If CLI LLM config diverges from reduced context, emit SetLlmConfigEvent to update - // Emit system prompt event if config available - if (Option.isSome(appConfigOption)) { + // Emit system prompt event on first-ever session + if (isFirstSession && Option.isSome(appConfigOption)) { const appConfig = appConfigOption.value const stateAfterConfig = yield* Ref.get(stateRef) const systemPromptEvent = new SystemPromptEvent({ @@ -531,17 +531,15 @@ export const makeMiniAgent = ( addEvent: (event) => addEventInternal(event), - subscribe: Effect.gen(function*() { + tapEventStream: Effect.gen(function*() { // PubSub.subscribe guarantees subscription is established when this effect completes const dequeue = yield* PubSub.subscribe(pubsub) return Stream.fromQueue(dequeue) }), - events: broadcast, - getEvents: Ref.get(stateRef).pipe(Effect.map((s) => s.events)), - getReducedContext: Ref.get(stateRef).pipe(Effect.map((s) => s.reducedContext)), + getState: Ref.get(stateRef).pipe(Effect.map((s) => s.reducedContext)), endSession: endSessionEffect, diff --git a/src/server.ts b/src/server.ts index a51ac33..835e5ff 100644 --- a/src/server.ts +++ b/src/server.ts @@ -11,6 +11,7 @@ import { FetchHttpClient, HttpServer } from "@effect/platform" import { BunContext, BunHttpServer, BunRuntime } from "@effect/platform-bun" import { ConfigProvider, Effect, Layer, LogLevel, Option } from "effect" import { AgentRegistry } from "./agent-registry.ts" +import { AgentService } from "./agent-service.ts" import { AppConfig, type MiniAgentConfig } from "./config.ts" import { EventReducer } from "./event-reducer.ts" import { EventStoreFileSystem } from "./event-store-fs.ts" @@ -97,7 +98,7 @@ const program = Effect.gen(function*() { // Build the full layer stack // AgentRegistry.Default requires EventStore, EventReducer, and MiniAgentTurn - const serviceLayer = AgentRegistry.Default.pipe( + const registryLayer = AgentRegistry.Default.pipe( Layer.provide(LlmTurnLive), Layer.provide(languageModelLayer), Layer.provide(llmConfigLayer), @@ -106,6 +107,8 @@ const program = Effect.gen(function*() { Layer.provide(appConfigLayer), Layer.provide(BunContext.layer) ) + const agentServiceLayer = AgentService.Default.pipe(Layer.provide(registryLayer)) + const serviceLayer = Layer.mergeAll(registryLayer, agentServiceLayer) // HTTP server layer // Set idleTimeout high for SSE streaming - Bun defaults to 10s which kills long-running streams diff --git a/test/cli.e2e.test.ts b/test/cli.e2e.test.ts index 579a59b..68988e2 100644 --- a/test/cli.e2e.test.ts +++ b/test/cli.e2e.test.ts @@ -188,6 +188,29 @@ describe("CLI", () => { expect(jsonOutput).toContain("\"AssistantMessageEvent\"") expect(jsonOutput).toContain("\"AgentTurnCompletedEvent\"") }) + + test("omits config events on resumed sessions", { timeout: 20000 }, async ({ llmEnv, testDir }) => { + const contextName = "raw-resume" + + await Effect.runPromise( + runCli(["chat", "-n", contextName, "-m", "seed", "--raw"], { + cwd: testDir, + env: llmEnv + }) + ) + + const secondRun = await Effect.runPromise( + runCli(["chat", "-n", contextName, "-m", "second run", "--raw"], { + cwd: testDir, + env: llmEnv + }) + ) + + const jsonOutput = extractJsonOutput(secondRun.stdout) + expect(jsonOutput).not.toContain("\"SetLlmConfigEvent\"") + expect(jsonOutput).not.toContain("\"SystemPromptEvent\"") + expect(jsonOutput).toContain("\"SessionStartedEvent\"") + }) }) describe("pipe mode (default for piped stdin)", () => { diff --git a/test/mini-agent.test.ts b/test/mini-agent.test.ts index 149a439..0a628da 100644 --- a/test/mini-agent.test.ts +++ b/test/mini-agent.test.ts @@ -105,7 +105,7 @@ describe("MiniAgent", () => { it.effect("initializes with empty messages in reducedContext", () => Effect.gen(function*() { const agent = yield* makeMiniAgent(testAgentName, testContextName) - const ctx = yield* agent.getReducedContext + const ctx = yield* agent.getState // SessionStartedEvent doesn't add to messages array expect(ctx.messages).toEqual([]) }).pipe( @@ -147,7 +147,7 @@ describe("MiniAgent", () => { // Wait for event to be processed (fire-and-forget) yield* waitForEventTag(agent, "UserMessageEvent") - const ctx = yield* agent.getReducedContext + const ctx = yield* agent.getState // SessionStarted (1) + UserMessage (1) = 2 expect(ctx.nextEventNumber).toBe(2) }).pipe( @@ -168,7 +168,7 @@ describe("MiniAgent", () => { // Wait for event to be processed (fire-and-forget) yield* waitForEventTag(agent, "UserMessageEvent") - const ctx = yield* agent.getReducedContext + const ctx = yield* agent.getState expect(ctx.messages.length).toBe(1) expect(ctx.messages[0]?.role).toBe("user") }).pipe( @@ -185,7 +185,7 @@ describe("MiniAgent", () => { yield* waitForEventTag(agent, "SessionStartedEvent") // Subscribe for UserMessages - const stream = yield* agent.subscribe + const stream = yield* agent.tapEventStream const collector = yield* stream.pipe( Stream.filter((e) => e._tag === "UserMessageEvent"), Stream.take(2), @@ -221,7 +221,7 @@ describe("MiniAgent", () => { )) }) - describe("getReducedContext", () => { + describe("getState", () => { it.effect("accumulates multiple messages", () => Effect.gen(function*() { const agent = yield* makeMiniAgent(testAgentName, testContextName) @@ -229,7 +229,7 @@ describe("MiniAgent", () => { yield* waitForEventTag(agent, "SessionStartedEvent") // Subscribe for the 3 messages - const stream = yield* agent.subscribe + const stream = yield* agent.tapEventStream const collector = yield* stream.pipe( Stream.filter((e) => e._tag === "AssistantMessageEvent"), Stream.take(1), @@ -259,7 +259,7 @@ describe("MiniAgent", () => { // Wait for AssistantMessage (last one) to be processed yield* Fiber.join(collector) - const ctx = yield* agent.getReducedContext + const ctx = yield* agent.getState expect(ctx.messages.length).toBe(3) expect(ctx.messages[0]?.role).toBe("system") expect(ctx.messages[1]?.role).toBe("user") @@ -276,11 +276,11 @@ describe("MiniAgent", () => { yield* waitForEventTag(agent, "SessionStartedEvent") // SessionStarted = 1 - let ctx = yield* agent.getReducedContext + let ctx = yield* agent.getState expect(ctx.nextEventNumber).toBe(1) // Subscribe for the 3 UserMessages - const stream = yield* agent.subscribe + const stream = yield* agent.tapEventStream const collector = yield* stream.pipe( Stream.filter((e) => e._tag === "UserMessageEvent"), Stream.take(3), @@ -311,7 +311,7 @@ describe("MiniAgent", () => { // Wait for all 3 to be processed yield* Fiber.join(collector) - ctx = yield* agent.getReducedContext + ctx = yield* agent.getState expect(ctx.nextEventNumber).toBe(4) }).pipe( Effect.scoped, @@ -402,7 +402,7 @@ describe("MiniAgent", () => { yield* waitForEventTag(agent, "SessionStartedEvent") // Subscribe for the 2 UserMessages - const stream = yield* agent.subscribe + const stream = yield* agent.tapEventStream const collector = yield* stream.pipe( Stream.filter((e) => e._tag === "UserMessageEvent"), Stream.take(2), @@ -607,7 +607,7 @@ describe("MiniAgent", () => { const agent = yield* makeMiniAgent(testAgentName, testContextName) // Subscribe to events - when this completes, subscription MUST be established - const eventStream = yield* agent.subscribe + const eventStream = yield* agent.tapEventStream // Fork collection of events - no sleep needed because subscription is guaranteed const collectorFiber = yield* eventStream.pipe( @@ -646,7 +646,7 @@ describe("MiniAgent", () => { const agent = yield* makeMiniAgent(testAgentName, testContextName) // Subscribe and immediately add multiple events - const eventStream = yield* agent.subscribe + const eventStream = yield* agent.tapEventStream const collectorFiber = yield* eventStream.pipe( Stream.filter((e) => e._tag === "UserMessageEvent"), @@ -719,7 +719,7 @@ describe("MiniAgent", () => { yield* waitForEventTag(agent, "SessionStartedEvent") // Subscribe to events - we want to receive SessionEndedEvent - const eventStream = yield* agent.subscribe + const eventStream = yield* agent.tapEventStream // Fork a collector that waits for SessionEndedEvent const collectorFiber = yield* eventStream.pipe( diff --git a/test/tty.e2e.test.ts b/test/tty.e2e.test.ts index a356b21..269b23b 100644 --- a/test/tty.e2e.test.ts +++ b/test/tty.e2e.test.ts @@ -10,7 +10,9 @@ * they compete for terminal resources and cause flaky failures. */ import { Effect } from "effect" +import * as fs from "node:fs" import { resolve } from "node:path" +import * as path from "node:path" import { launchTerminal } from "tuistory" import { describe } from "vitest" @@ -57,6 +59,40 @@ describe.sequential("TTY Interactive Mode", () => { } ) + test( + "does not duplicate config events when reopening session", + { timeout: 20000 }, + async ({ llmEnv, testDir }) => { + const agentName = "tty-resume-test" + + const openSession = async () => { + const session = await launchTerminal({ + command: "bun", + args: [CLI_PATH, "--cwd", testDir, "chat", "-n", agentName], + cols: 80, + rows: 24, + env: testEnv(llmEnv) + }) + try { + await session.waitForText("Agent:", { timeout: 5000 }) + } finally { + await session.press(["ctrl", "c"]) + session.close() + } + } + + await openSession() + await openSession() + + const contextFile = path.join(testDir, ".mini-agent", "contexts", `${agentName}-v1.yaml`) + const contents = fs.readFileSync(contextFile, "utf8") + const llmMatches = contents.match(/SetLlmConfigEvent/g) ?? [] + const promptMatches = contents.match(/SystemPromptEvent/g) ?? [] + expect(llmMatches.length).toBe(1) + expect(promptMatches.length).toBe(1) + } + ) + // ============================================ // UI-only tests (no LLM needed, fast) // ============================================ @@ -98,10 +134,10 @@ describe.sequential("TTY Interactive Mode", () => { }) test("shows context name in footer", { timeout: 10000 }, async ({ llmEnv, testDir }) => { - const contextName = "my-special-context" + const agentName = "my-special-context" const session = await launchTerminal({ command: "bun", - args: [CLI_PATH, "--cwd", testDir, "chat", "-n", contextName], + args: [CLI_PATH, "--cwd", testDir, "chat", "-n", agentName], cols: 100, rows: 30, env: testEnv(llmEnv) @@ -109,8 +145,8 @@ describe.sequential("TTY Interactive Mode", () => { try { await session.waitForText("Starting new conversation", { timeout: 5000 }) - const text = await session.waitForText(contextName, { timeout: 3000 }) - expect(text).toContain(`Agent: ${contextName}`) + const text = await session.waitForText(agentName, { timeout: 3000 }) + expect(text).toContain(`Agent: ${agentName}`) } finally { await session.press(["ctrl", "c"]) session.close()