diff --git a/README.md b/README.md index 44b6105..ae4cd08 100644 --- a/README.md +++ b/README.md @@ -18,14 +18,18 @@ cat examples/pirate.jsonl | bun run mini-agent chat --script ## Modes -| Mode | Trigger | Input | Output | -|------|---------|-------|--------| -| Single-turn | `-m "msg"` | CLI arg | Plain text | -| Pipe | piped stdin | Plain text | Plain text | -| Script | `--script` | JSONL events | JSONL events | -| Interactive | TTY stdin | Prompts | Plain text | +Explicit mode selection via `--mode`: -Add `--raw` for JSONL output. Add `-n name` to persist conversation. +| Mode | `--mode` | Trigger (auto) | Input | Output | +|------|----------|----------------|-------|--------| +| TUI | `tui` | TTY stdin | Interactive prompts | Plain text | +| Single-turn | auto | `-m "msg"` | CLI arg | Plain text | +| Piped | `piped` | piped stdin | Plain text | Plain text | +| Script | `script` | `--script` | JSONL events | JSONL events | + +Auto mode (default) detects based on stdin and `-m` flag. + +Add `--raw` for JSONL output with all events. Add `-n name` to persist conversation. ## CLI Options @@ -35,9 +39,10 @@ See [`src/cli/commands.ts`](src/cli/commands.ts) for full definitions. |--------|-------|-------------| | `--name` | `-n` | Context name (persists conversation) | | `--message` | `-m` | Single message (non-interactive) | -| `--raw` | `-r` | Output as JSONL | -| `--script` | `-s` | JSONL in/out mode | -| `--show-ephemeral` | `-e` | Include streaming deltas | +| `--mode` | | Interaction mode: tui/script/piped/auto | +| `--raw` | `-r` | Output all events as JSONL | +| `--script` | `-s` | Alias for `--mode script` | +| `--show-ephemeral` | `-e` | Include streaming deltas in output | | `--image` | `-i` | Attach image file or URL | | `--config` | `-c` | YAML config file | | `--cwd` | | Working directory | @@ -78,20 +83,26 @@ LLM='{"apiFormat":"openai-chat-completions","model":"my-model","baseUrl":"https: ## Event Types -See [`src/context.model.ts`](src/context.model.ts) for schema definitions. +See [`src/domain.ts`](src/domain.ts) for schema definitions. **Input Events** (via stdin in script mode): - `UserMessage` - User message content - `SystemPrompt` - System behavior configuration - `FileAttachment` - Image or file attachment -**Output Events**: -- `TextDelta` - Streaming chunk (ephemeral) -- `AssistantMessage` - Complete response (persisted) +**Output Events** (with `--raw`): +- `SessionStartedEvent` - Session started +- `SetLlmConfigEvent` - LLM configuration for context +- `SystemPromptEvent` - System prompt configuration +- `UserMessageEvent` - User message +- `AgentTurnStartedEvent` - Turn started +- `TextDeltaEvent` - Streaming chunk (with `--show-ephemeral`) +- `AssistantMessageEvent` - Complete response +- `AgentTurnCompletedEvent` / `AgentTurnFailedEvent` - Turn ended **Internal Events** (persisted): -- `SetLlmConfig` - LLM configuration for context -- `LLMRequestInterrupted` - Partial response on cancellation +- `AgentTurnInterruptedEvent` - Partial response on cancellation +- `SessionEndedEvent` - Session ended ## Script Mode @@ -127,9 +138,11 @@ bun run mini-agent serve --port 3000 ``` Endpoints: -- `POST /context/:name` - Send JSONL body, receive SSE stream +- `POST /agent/:agentName` - Send JSON body `{"_tag":"UserMessage","content":"..."}`, receive SSE stream of all events (historical + turn) - `GET /health` - Health check +The SSE stream includes all events: `SessionStartedEvent`, `SetLlmConfigEvent`, `SystemPromptEvent`, prior messages, then the current turn events. + See [`src/http.ts`](src/http.ts) for implementation. ## Configuration diff --git a/src/new-architecture/agent-registry.ts b/src/agent-registry.ts similarity index 99% rename from src/new-architecture/agent-registry.ts rename to src/agent-registry.ts index 5da6a27..c428545 100644 --- a/src/new-architecture/agent-registry.ts +++ b/src/agent-registry.ts @@ -39,7 +39,7 @@ export class AgentRegistry extends Effect.Service()("@mini-agent/ const agents = yield* Ref.make(new Map()) // Generate context name from agent name - const makeContextName = (agentName: AgentName): ContextName => `${agentName}-v1` as ContextName + const makeContextName = (agentName: AgentName): ContextName => `${agentName}` as ContextName // Track in-progress creations using Deferred for proper synchronization const creationLocks = yield* Ref.make( diff --git a/src/cli/chat-ui.ts b/src/cli/chat-ui.ts index 4a6b290..c397958 100644 --- a/src/cli/chat-ui.ts +++ b/src/cli/chat-ui.ts @@ -4,22 +4,67 @@ * Interactive chat with interruptible LLM streaming. * Return during streaming interrupts (with optional new message); Escape exits. */ -import type { AiError, LanguageModel } from "@effect/ai" -import type { Error as PlatformError, FileSystem } from "@effect/platform" -import { Cause, Context, Effect, Fiber, Layer, Mailbox, Stream } from "effect" -import { is } from "effect/Schema" +import type { Error as PlatformError } from "@effect/platform" +import { Cause, Context, DateTime, Effect, Fiber, Layer, Mailbox, Option, Stream } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { - AssistantMessageEvent, + type AgentName, + AgentTurnInterruptedEvent, type ContextEvent, - LLMRequestInterruptedEvent, - TextDeltaEvent, - UserMessageEvent -} from "../context.model.ts" -import { ContextService } from "../context.service.ts" -import type { ContextLoadError, ContextSaveError } from "../errors.ts" -import type { CurrentLlmConfig } from "../llm-config.ts" -import { streamLLMResponse } from "../llm.ts" -import { type ChatController, runOpenTUIChat } from "./components/opentui-chat.tsx" + type ContextName, + type ContextSaveError, + EventBuilder, + type EventId, + type ReducedContext, + type ReducerError +} from "../domain.ts" +import { type ChatController, type ChatEvent, runOpenTUIChat } from "./components/opentui-chat.tsx" + +/** + * Map a ContextEvent to the TUI ChatEvent format. + * Returns null for events that shouldn't be displayed. + */ +const mapEventToTui = (e: ContextEvent): ChatEvent | null => { + switch (e._tag) { + case "UserMessageEvent": + return { _tag: "UserMessage", content: e.content } + case "AssistantMessageEvent": + return { _tag: "AssistantMessage", content: e.content } + case "SystemPromptEvent": + return { _tag: "SystemPrompt", content: e.content } + case "TextDeltaEvent": + return { _tag: "TextDelta", delta: e.delta } + case "AgentTurnInterruptedEvent": + return { + _tag: "LLMRequestInterrupted", + requestId: e.id, + reason: e.reason, + partialResponse: Option.getOrElse(e.partialResponse, () => "") + } + case "FileAttachmentEvent": { + const fileName = Option.getOrUndefined(e.fileName) + return fileName + ? { _tag: "FileAttachment", source: e.source, fileName } + : { _tag: "FileAttachment", source: e.source } + } + case "SessionStartedEvent": + return { _tag: "SessionStarted" } + case "SessionEndedEvent": + return { _tag: "SessionEnded" } + case "AgentTurnStartedEvent": + return { _tag: "AgentTurnStarted", turnNumber: e.turnNumber } + case "AgentTurnCompletedEvent": + return { _tag: "AgentTurnCompleted", turnNumber: e.turnNumber, durationMs: e.durationMs } + case "AgentTurnFailedEvent": + return { _tag: "AgentTurnFailed", turnNumber: e.turnNumber, error: e.error } + case "SetLlmConfigEvent": + return { _tag: "SetLlmConfig", model: e.model, provider: e.providerId } + case "SetTimeoutEvent": + return { _tag: "SetTimeout", timeoutMs: e.timeoutMs } + default: + return null + } +} type ChatSignal = | { readonly _tag: "Input"; readonly text: string } @@ -29,26 +74,29 @@ export class ChatUI extends Context.Tag("@app/ChatUI")< ChatUI, { readonly runChat: ( - contextName: string - ) => Effect.Effect< - void, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > + agentName: AgentName + ) => Effect.Effect } >() { static readonly layer = Layer.effect( ChatUI, - Effect.gen(function*() { - const contextService = yield* ContextService + Effect.sync(() => { + const runChat = Effect.fn("ChatUI.runChat")(function*(agentName: AgentName) { + const registry = yield* AgentRegistry + const agent = yield* registry.getOrCreate(agentName) - const runChat = Effect.fn("ChatUI.runChat")(function*(contextName: string) { - const existingEvents = yield* contextService.load(contextName) + // Load existing events to display in chat + const existingEvents = yield* agent.getEvents const mailbox = yield* Mailbox.make() + // Map ContextEvent to the format expected by OpenTUI chat + const tuiEvents = existingEvents.map((e) => mapEventToTui(e)).filter((e): e is NonNullable => + e !== null + ) + const chat = yield* Effect.promise(() => - runOpenTUIChat(contextName, existingEvents, { + runOpenTUIChat(agentName, tuiEvents, { onSubmit: (text) => { mailbox.unsafeOffer({ _tag: "Input", text }) }, @@ -58,7 +106,7 @@ export class ChatUI extends Context.Tag("@app/ChatUI")< }) ) - yield* runChatLoop(contextName, contextService, chat, mailbox).pipe( + yield* runChatLoop(agentName, agent, chat, mailbox).pipe( Effect.catchAll((error) => Effect.logError("Chat error", { error }).pipe(Effect.as(undefined))), Effect.catchAllCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.void : Effect.logError("Chat loop error", cause) @@ -74,19 +122,23 @@ export class ChatUI extends Context.Tag("@app/ChatUI")< static readonly testLayer = Layer.sync(ChatUI, () => ChatUI.of({ runChat: () => Effect.void })) } +interface MiniAgentInterface { + readonly agentName: AgentName + readonly contextName: ContextName + readonly addEvent: (event: ContextEvent) => Effect.Effect + readonly events: Stream.Stream + readonly getReducedContext: Effect.Effect +} + const runChatLoop = ( - contextName: string, - contextService: Context.Tag.Service, + agentName: AgentName, + agent: MiniAgentInterface, chat: ChatController, mailbox: Mailbox.Mailbox -): Effect.Effect< - void, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig -> => +): Effect.Effect => Effect.fn("ChatUI.runChatLoop")(function*() { while (true) { - const result = yield* runChatTurn(contextName, contextService, chat, mailbox, null) + const result = yield* runChatTurn(agentName, agent, chat, mailbox, null) if (result._tag === "exit") { return } @@ -98,16 +150,12 @@ type TurnResult = | { readonly _tag: "exit" } const runChatTurn = ( - contextName: string, - contextService: Context.Tag.Service, + agentName: AgentName, + agent: MiniAgentInterface, chat: ChatController, mailbox: Mailbox.Mailbox, pendingMessage: string | null -): Effect.Effect< - TurnResult, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig -> => +): Effect.Effect => Effect.fn("ChatUI.runChatTurn")(function*() { // Get message either from pending or by waiting for input let userMessage: string @@ -128,35 +176,34 @@ const runChatTurn = ( return { _tag: "continue" } as const } - const userEvent = new UserMessageEvent({ content: userMessage }) + const ctx = yield* agent.getReducedContext + const userEvent = EventBuilder.userMessage(agentName, agent.contextName, ctx.nextEventNumber, userMessage) - yield* contextService.persistEvent(contextName, userEvent) - chat.addEvent(userEvent) + // Add user message to TUI and agent + chat.addEvent({ _tag: "UserMessage", content: userMessage }) - const events = yield* contextService.load(contextName) let accumulatedText = "" const streamFiber = yield* Effect.fork( - streamLLMResponse(events).pipe( - Stream.tap((event: ContextEvent) => + agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((event) => Effect.sync(() => { - if (is(TextDeltaEvent)(event)) { + if (event._tag === "TextDeltaEvent") { accumulatedText += event.delta - chat.addEvent(event) + chat.addEvent({ _tag: "TextDelta", delta: event.delta }) + } else if (event._tag === "AssistantMessageEvent") { + chat.addEvent({ _tag: "AssistantMessage", content: event.content }) } }) ), - Stream.filter(is(AssistantMessageEvent)), - Stream.tap((event) => - Effect.gen(function*() { - yield* contextService.persistEvent(contextName, event) - chat.addEvent(event) - }) - ), Stream.runDrain ) ) + // Add event to trigger LLM turn + yield* agent.addEvent(userEvent) + const result = yield* awaitStreamCompletion(streamFiber, mailbox) if (result._tag === "completed") { @@ -165,30 +212,52 @@ const runChatTurn = ( if (result._tag === "exit") { if (accumulatedText.length > 0) { - const interruptedEvent = new LLMRequestInterruptedEvent({ - requestId: crypto.randomUUID(), + const updatedCtx = yield* agent.getReducedContext + const interruptedEvent = new AgentTurnInterruptedEvent({ + id: `${agent.contextName}:${String(updatedCtx.nextEventNumber).padStart(4, "0")}` as EventId, + timestamp: DateTime.unsafeNow(), + agentName, + parentEventId: Option.none(), + triggersAgentTurn: false, + turnNumber: updatedCtx.currentTurnNumber, + reason: "user_cancel", + partialResponse: Option.some(accumulatedText) + }) + yield* agent.addEvent(interruptedEvent) + chat.addEvent({ + _tag: "LLMRequestInterrupted", + requestId: interruptedEvent.id, reason: "user_cancel", partialResponse: accumulatedText }) - yield* contextService.persistEvent(contextName, interruptedEvent) - chat.addEvent(interruptedEvent) } return { _tag: "exit" } as const } // result._tag === "interrupted" - user hit return during streaming if (accumulatedText.length > 0) { - const interruptedEvent = new LLMRequestInterruptedEvent({ - requestId: crypto.randomUUID(), + const updatedCtx = yield* agent.getReducedContext + const interruptedEvent = new AgentTurnInterruptedEvent({ + id: `${agent.contextName}:${String(updatedCtx.nextEventNumber).padStart(4, "0")}` as EventId, + timestamp: DateTime.unsafeNow(), + agentName, + parentEventId: Option.none(), + triggersAgentTurn: false, + turnNumber: updatedCtx.currentTurnNumber, + reason: result.newMessage ? "user_new_message" : "user_cancel", + partialResponse: Option.some(accumulatedText) + }) + yield* agent.addEvent(interruptedEvent) + chat.addEvent({ + _tag: "LLMRequestInterrupted", + requestId: interruptedEvent.id, reason: result.newMessage ? "user_new_message" : "user_cancel", partialResponse: accumulatedText }) - yield* contextService.persistEvent(contextName, interruptedEvent) - chat.addEvent(interruptedEvent) } if (result.newMessage) { - return yield* runChatTurn(contextName, contextService, chat, mailbox, result.newMessage) + return yield* runChatTurn(agentName, agent, chat, mailbox, result.newMessage) } return { _tag: "continue" } as const @@ -200,9 +269,9 @@ type StreamResult = | { readonly _tag: "interrupted"; readonly newMessage: string | null } const awaitStreamCompletion = ( - fiber: Fiber.RuntimeFiber, + fiber: Fiber.RuntimeFiber, mailbox: Mailbox.Mailbox -): Effect.Effect => +): Effect.Effect => Effect.fn("ChatUI.awaitStreamCompletion")(function*() { const waitForFiber = Fiber.join(fiber).pipe(Effect.as({ _tag: "completed" } as StreamResult)) const waitForInterrupt = Effect.gen(function*() { diff --git a/src/cli/commands.ts b/src/cli/commands.ts index 92f939e..00912f7 100644 --- a/src/cli/commands.ts +++ b/src/cli/commands.ts @@ -7,21 +7,12 @@ import { type Prompt, Telemetry } from "@effect/ai" import { Command, Options, Prompt as CliPrompt } from "@effect/cli" import { type Error as PlatformError, FileSystem, HttpServer, Terminal } from "@effect/platform" import { BunHttpServer, BunStream } from "@effect/platform-bun" -import { Chunk, Console, Effect, Layer, Option, Schema, Stream } from "effect" +import { Chunk, Console, Effect, Fiber, Layer, Option, Schema, Stream } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { AppConfig, resolveBaseDir } from "../config.ts" -import { - AssistantMessageEvent, - type ContextEvent, - FileAttachmentEvent, - type InputEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "../context.model.ts" -import { ContextService } from "../context.service.ts" +import { type AgentName, type ContextEvent, EventBuilder, SystemPromptEvent, UserMessageEvent } from "../domain.ts" import { makeRouter } from "../http.ts" import { layercodeCommand } from "../layercode/index.ts" -import { AgentServer } from "../server.service.ts" import { printTraceLinks } from "../tracing.ts" export const configFileOption = Options.file("config").pipe( @@ -79,9 +70,21 @@ const showEphemeralOption = Options.boolean("show-ephemeral").pipe( Options.withDefault(false) ) +/** CLI interaction modes */ +const ModeOption = Schema.Literal("tui", "script", "piped", "auto") +type ModeOption = typeof ModeOption.Type + +const modeOption = Options.choice("mode", ["tui", "script", "piped", "auto"]).pipe( + Options.withDescription( + "Interaction mode: tui (interactive terminal), script (JSONL in/out), piped (plain text in/out), auto (detect)" + ), + Options.withDefault("auto" as const) +) + +// Keep --script as an alias for --mode script for backwards compatibility const scriptOption = Options.boolean("script").pipe( Options.withAlias("s"), - Options.withDescription("Script mode: read JSONL events from stdin, output JSONL events"), + Options.withDescription("Alias for --mode script: read JSONL events from stdin, output JSONL events"), Options.withDefault(false) ) @@ -127,79 +130,66 @@ const handleEvent = ( const terminal = yield* Terminal.Terminal if (options.raw) { - if (Schema.is(TextDeltaEvent)(event) && !options.showEphemeral) { + if (event._tag === "TextDeltaEvent" && !options.showEphemeral) { return } yield* Console.log(JSON.stringify(event)) return } - if (Schema.is(TextDeltaEvent)(event)) { + if (event._tag === "TextDeltaEvent") { yield* terminal.display(event.delta) return } - if (Schema.is(AssistantMessageEvent)(event)) { + if (event._tag === "AssistantMessageEvent") { yield* Console.log("") return } }) -/** Run the event stream, handling each event */ -const runEventStream = ( - contextName: string, - userMessage: string, - options: OutputOptions, - imageInput?: string -) => - Effect.gen(function*() { - const contextService = yield* ContextService - const inputEvents: Array = [] - - if (imageInput) { - const mediaType = getMediaType(imageInput) - const fileName = getFileName(imageInput) - - if (isUrl(imageInput)) { - inputEvents.push( - new FileAttachmentEvent({ - source: { type: "url", url: imageInput }, - mediaType, - fileName - }) - ) - } else { - inputEvents.push( - new FileAttachmentEvent({ - source: { type: "file", path: imageInput }, - mediaType, - fileName - }) - ) - } - } - - inputEvents.push(new UserMessageEvent({ content: userMessage })) - - yield* contextService.addEvents(contextName, inputEvents).pipe( - Stream.runForEach((event) => handleEvent(event, options)) - ) - }) - /** CLI interaction mode - determines how input/output is handled */ -const InteractionMode = Schema.Literal("single-turn", "pipe", "script", "tty-interactive") +const InteractionMode = Schema.Literal("single-turn", "piped", "script", "tui") type InteractionMode = typeof InteractionMode.Type +/** + * Determine interaction mode from options. + * + * Mode precedence: + * 1. Explicit --mode (if not "auto") + * 2. --script flag (alias for --mode script) + * 3. -m message provided → single-turn + * 4. stdin is TTY → tui + * 5. stdin is piped → piped + */ const determineMode = (options: { + mode: ModeOption message: Option.Option script: boolean }): InteractionMode => { + // Explicit mode takes precedence (unless auto) + if (options.mode !== "auto") { + return options.mode + } + + // --script flag is alias for --mode script + if (options.script) { + return "script" + } + + // -m message means single-turn const hasMessage = Option.isSome(options.message) && Option.getOrElse(options.message, () => "").trim() !== "" + if (hasMessage) { + return "single-turn" + } - if (hasMessage) return "single-turn" - if (options.script) return "script" - if (process.stdin.isTTY) return "tty-interactive" - return "pipe" + // TTY means interactive TUI + if (process.stdin.isTTY) { + return "tui" + } + + // Otherwise piped stdin + return "piped" } const utf8Decoder = new TextDecoder("utf-8") @@ -210,7 +200,24 @@ const readAllStdin: Effect.Effect = BunStream.stdin.pipe( Effect.map((chunks) => Chunk.join(chunks, "").trim()) ) -const ScriptInputEvent = Schema.Union(UserMessageEvent, SystemPromptEvent) +// Script input events - simplified format for external tools +// Accept both the full event format (with all fields) and simplified format (just _tag and content) +const SimpleUserMessage = Schema.Struct({ + _tag: Schema.Literal("UserMessage"), + content: Schema.String +}) +const SimpleSystemPrompt = Schema.Struct({ + _tag: Schema.Literal("SystemPrompt"), + content: Schema.String +}) +const ScriptInputEvent = Schema.Union( + UserMessageEvent, + SystemPromptEvent, + SimpleUserMessage, + SimpleSystemPrompt +) + +type ScriptInputEvent = typeof ScriptInputEvent.Type const stdinEvents = BunStream.stdin.pipe( Stream.mapChunks(Chunk.map((bytes) => utf8Decoder.decode(bytes))), @@ -223,20 +230,43 @@ const stdinEvents = BunStream.stdin.pipe( ) ) -const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => +const isUserMessage = (e: ScriptInputEvent): e is typeof UserMessageEvent.Type | typeof SimpleUserMessage.Type => + e._tag === "UserMessageEvent" || e._tag === "UserMessage" + +const isSystemPrompt = (e: ScriptInputEvent): e is typeof SystemPromptEvent.Type | typeof SimpleSystemPrompt.Type => + e._tag === "SystemPromptEvent" || e._tag === "SystemPrompt" + +const scriptInteractiveLoop = (agentName: AgentName, options: OutputOptions) => Effect.gen(function*() { - const contextService = yield* ContextService + const registry = yield* AgentRegistry + const agent = yield* registry.getOrCreate(agentName) yield* stdinEvents.pipe( Stream.mapEffect((event) => Effect.gen(function*() { + // Echo the received event yield* Console.log(JSON.stringify(event)) - if (Schema.is(UserMessageEvent)(event)) { - yield* contextService.addEvents(contextName, [event]).pipe( - Stream.runForEach((outputEvent) => handleEvent(outputEvent, options)) + if (isUserMessage(event)) { + const ctx = yield* agent.getReducedContext + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + ctx.nextEventNumber, + event.content + ) + + // Subscribe to events before adding user message + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((e) => handleEvent(e, options)), + Stream.runDrain, + Effect.fork ) - } else if (Schema.is(SystemPromptEvent)(event)) { + + yield* agent.addEvent(userEvent) + yield* Fiber.join(streamFiber) + } else if (isSystemPrompt(event)) { yield* Effect.logDebug("SystemPrompt events in script mode are echoed but not persisted") } }) @@ -248,8 +278,19 @@ const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => const NEW_CONTEXT_VALUE = "__new__" const selectOrCreateContext = Effect.gen(function*() { - const contextService = yield* ContextService - const contexts = yield* contextService.list() + const fs = yield* FileSystem.FileSystem + const config = yield* AppConfig + const baseDir = resolveBaseDir(config) + const contextsDir = `${baseDir}/contexts` + + const exists = yield* fs.exists(contextsDir) + if (!exists) { + yield* Console.log("No existing contexts found.") + return yield* CliPrompt.text({ message: "Enter a name for your new context" }) + } + + const entries = yield* fs.readDirectory(contextsDir) + const contexts = entries.filter((name) => name.endsWith(".yaml")).map((name) => name.replace(/\.yaml$/, "")) if (contexts.length === 0) { yield* Console.log("No existing contexts found.") @@ -295,18 +336,92 @@ const makeChatUILayer = () => }) ) +/** Run the event stream for a single message */ +const runSingleTurn = ( + agentName: AgentName, + userMessage: string, + options: OutputOptions, + imageInput?: string +) => + Effect.gen(function*() { + const registry = yield* AgentRegistry + const agent = yield* registry.getOrCreate(agentName) + + // In raw mode, first output all historical events (including SessionStartedEvent) + if (options.raw) { + const historicalEvents = yield* agent.getEvents + for (const event of historicalEvents) { + yield* handleEvent(event, options) + } + } + + const ctx = yield* agent.getReducedContext + + // If there's an image, add it first + if (imageInput) { + const mediaType = getMediaType(imageInput) + const fileName = getFileName(imageInput) + const source = isUrl(imageInput) + ? { type: "url" as const, url: imageInput } + : { type: "file" as const, path: imageInput } + + const fileEvent = EventBuilder.fileAttachment( + agentName, + agent.contextName, + ctx.nextEventNumber, + source, + mediaType, + Option.some(fileName) + ) + yield* agent.addEvent(fileEvent) + // Output the file event in raw mode + if (options.raw) { + yield* handleEvent(fileEvent, options) + } + } + + // Get updated context after adding file + const updatedCtx = yield* agent.getReducedContext + + // Subscribe to events BEFORE adding the user message + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((e) => handleEvent(e, options)), + Stream.runDrain, + Effect.fork + ) + + // Small delay to ensure subscription is active + yield* Effect.sleep("10 millis") + + // Add user message with triggersAgentTurn=true + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + updatedCtx.nextEventNumber, + userMessage + ) + yield* agent.addEvent(userEvent) + + // Wait for stream to complete + yield* Fiber.join(streamFiber) + }) + const runChat = (options: { name: Option.Option message: Option.Option image: Option.Option + mode: ModeOption raw: boolean script: boolean showEphemeral: boolean }) => Effect.gen(function*() { - yield* Effect.logDebug("Starting chat session") const mode = determineMode(options) + yield* Effect.logDebug("Starting chat session", { mode, explicitMode: options.mode }) + const contextName = Option.getOrElse(options.name, generateRandomContextName) + const agentName = contextName as AgentName const imagePath = Option.getOrNull(options.image) ?? undefined const outputOptions: OutputOptions = { @@ -317,27 +432,27 @@ const runChat = (options: { switch (mode) { case "single-turn": { const message = Option.getOrElse(options.message, () => "") - yield* runEventStream(contextName, message, outputOptions, imagePath) + yield* runSingleTurn(agentName, message, outputOptions, imagePath) if (!outputOptions.raw) { yield* printTraceLinks } break } - case "pipe": { + case "piped": { const input = yield* readAllStdin if (input !== "") { - yield* runEventStream(contextName, input, { raw: false, showEphemeral: false }, imagePath) + yield* runSingleTurn(agentName, input, outputOptions, imagePath) } break } case "script": { - yield* scriptInteractiveLoop(contextName, outputOptions) + yield* scriptInteractiveLoop(agentName, outputOptions) break } - case "tty-interactive": { + case "tui": { const resolvedName = Option.isSome(options.name) ? contextName : yield* selectOrCreateContext @@ -345,7 +460,7 @@ const runChat = (options: { const { ChatUI } = yield* Effect.promise(() => import("./chat-ui.ts")) const chatUI = yield* ChatUI - yield* chatUI.runChat(resolvedName).pipe( + yield* chatUI.runChat(resolvedName as AgentName).pipe( Effect.catchAllCause(() => Effect.void), Effect.ensuring(printTraceLinks.pipe(Effect.flatMap(() => Console.log("\nGoodbye!")))) ) @@ -423,13 +538,18 @@ const chatCommand = Command.make( name: nameOption, message: messageOption, image: imageOption, + mode: modeOption, raw: rawOption, script: scriptOption, showEphemeral: showEphemeralOption }, - ({ image, message, name, raw, script, showEphemeral }) => - runChat({ image, message, name, raw, script, showEphemeral }) -).pipe(Command.withDescription("Chat with an AI assistant using persistent context history")) + ({ image, message, mode, name, raw, script, showEphemeral }) => + runChat({ image, message, mode: mode as ModeOption, name, raw, script, showEphemeral }) +).pipe( + Command.withDescription( + "Chat with an AI assistant. Modes: tui (interactive), script (JSONL), piped (plain text), auto (detect)" + ) +) const logTestCommand = Command.make( "log-test", @@ -485,7 +605,7 @@ const hostOption = Options.text("host").pipe( Options.optional ) -/** Generic serve command - starts HTTP server with /context/:name endpoint */ +/** Generic serve command - starts HTTP server with /agent/:agentName endpoint */ export const serveCommand = Command.make( "serve", { @@ -501,37 +621,31 @@ export const serveCommand = Command.make( yield* Console.log(`Starting HTTP server on http://${actualHost}:${actualPort}`) yield* Console.log("") yield* Console.log("Endpoints:") - yield* Console.log(" POST /context/:contextName") - yield* Console.log(" Send JSONL events, receive SSE stream") - yield* Console.log(" Content-Type: application/x-ndjson") + yield* Console.log(" POST /agent/:agentName") + yield* Console.log(" Send JSON message, receive SSE stream") + yield* Console.log(" Content-Type: application/json") yield* Console.log("") yield* Console.log(" GET /health") yield* Console.log(" Health check endpoint") yield* Console.log("") yield* Console.log("Example:") - yield* Console.log(` curl -X POST http://${actualHost}:${actualPort}/context/test \\`) - yield* Console.log(` -H "Content-Type: application/x-ndjson" \\`) + yield* Console.log(` curl -X POST http://${actualHost}:${actualPort}/agent/test \\`) + yield* Console.log(` -H "Content-Type: application/json" \\`) yield* Console.log(` -d '{"_tag":"UserMessage","content":"hello"}'`) yield* Console.log("") // Create server layer with configured port/host const serverLayer = BunHttpServer.layer({ port: actualPort, hostname: actualHost }) - // Create layers for the server - const layers = Layer.mergeAll( - serverLayer, - AgentServer.layer - ) - // Use Layer.launch to keep the server running return yield* Layer.launch( HttpServer.serve(makeRouter).pipe( - Layer.provide(layers) + Layer.provide(serverLayer) ) ) }) ).pipe( - Command.withDescription("Start generic HTTP server for agent requests") + Command.withDescription("Start HTTP server for agent requests") ) const rootCommand = Command.make( diff --git a/src/cli/components/opentui-chat.tsx b/src/cli/components/opentui-chat.tsx index a037cca..97ce369 100644 --- a/src/cli/components/opentui-chat.tsx +++ b/src/cli/components/opentui-chat.tsx @@ -2,7 +2,7 @@ * OpenTUI Chat Component * * Architecture: - * - ContextEvent[] dispatched via controller.addEvent() + * - Event objects dispatched via controller.addEvent() * - feedReducer folds each event into FeedItem[] (accumulated state) * - Feed component renders feedItems (pure render, knows nothing about events) * @@ -15,8 +15,28 @@ import { Option, Schema } from "effect" import { createCliRenderer, TextAttributes } from "@opentui/core" import { createRoot } from "@opentui/react/renderer" import { memo, useCallback, useMemo, useReducer, useRef, useState } from "react" -import type { ContextEvent, PersistedEvent } from "../../context.model.ts" -import { AttachmentSource } from "../../context.model.ts" +import { AttachmentSource } from "../../domain.ts" + +/** + * Simplified event interface for the chat UI. + * This decouples the TUI from the full ContextEvent structure. + */ +export interface ChatEvent { + _tag: string + content?: string + delta?: string + partialResponse?: string + reason?: string + source?: { type: "file"; path: string } | { type: "url"; url: string } + fileName?: string + requestId?: string + turnNumber?: number + durationMs?: number + error?: string + model?: string + provider?: string + timeoutMs?: number +} /** User's message in the conversation */ class UserMessageItem extends Schema.TaggedClass()("UserMessageItem", { @@ -54,6 +74,50 @@ class FileAttachmentItem extends Schema.TaggedClass()("FileA isHistory: Schema.Boolean }) {} +/** System prompt configuration */ +class SystemPromptItem extends Schema.TaggedClass()("SystemPromptItem", { + id: Schema.String, + content: Schema.String, + isHistory: Schema.Boolean +}) {} + +/** Session started lifecycle event */ +class SessionStartedItem extends Schema.TaggedClass()("SessionStartedItem", { + id: Schema.String, + isHistory: Schema.Boolean +}) {} + +/** Agent turn started lifecycle event */ +class AgentTurnStartedItem extends Schema.TaggedClass()("AgentTurnStartedItem", { + id: Schema.String, + turnNumber: Schema.Number, + isHistory: Schema.Boolean +}) {} + +/** Agent turn completed lifecycle event */ +class AgentTurnCompletedItem extends Schema.TaggedClass()("AgentTurnCompletedItem", { + id: Schema.String, + turnNumber: Schema.Number, + durationMs: Schema.Number, + isHistory: Schema.Boolean +}) {} + +/** Agent turn failed lifecycle event */ +class AgentTurnFailedItem extends Schema.TaggedClass()("AgentTurnFailedItem", { + id: Schema.String, + turnNumber: Schema.Number, + error: Schema.String, + isHistory: Schema.Boolean +}) {} + +/** LLM config change event */ +class SetLlmConfigItem extends Schema.TaggedClass()("SetLlmConfigItem", { + id: Schema.String, + model: Schema.String, + provider: Schema.String, + isHistory: Schema.Boolean +}) {} + /** Fallback for unknown event types - displays muted warning */ class UnknownEventItem extends Schema.TaggedClass()("UnknownEventItem", { id: Schema.String, @@ -67,11 +131,17 @@ const FeedItem = Schema.Union( AssistantMessageItem, LLMInterruptionItem, FileAttachmentItem, + SystemPromptItem, + SessionStartedItem, + AgentTurnStartedItem, + AgentTurnCompletedItem, + AgentTurnFailedItem, + SetLlmConfigItem, UnknownEventItem ) type FeedItem = typeof FeedItem.Type -type FeedAction = { event: ContextEvent; isHistory: boolean } +type FeedAction = { event: ChatEvent; isHistory: boolean } /** * Folds a context event into accumulated feed items. @@ -83,38 +153,39 @@ function feedReducer(items: FeedItem[], action: FeedAction): FeedItem[] { switch (event._tag) { case "TextDelta": { const last = items.at(-1) - if (last?._tag === "InProgressAssistantItem") { + if (last && "_tag" in last && last._tag === "InProgressAssistantItem") { + const lastItem = last as InProgressAssistantItem return [ ...items.slice(0, -1), - new InProgressAssistantItem({ ...last, text: last.text + event.delta }) + new InProgressAssistantItem({ ...lastItem, text: lastItem.text + (event.delta ?? "") }) ] } return [ ...items, - new InProgressAssistantItem({ id: crypto.randomUUID(), text: event.delta }) + new InProgressAssistantItem({ id: crypto.randomUUID(), text: event.delta ?? "" }) ] } case "AssistantMessage": { - const filtered = items.filter((i) => i._tag !== "InProgressAssistantItem") + const filtered = items.filter((i) => "_tag" in i && i._tag !== "InProgressAssistantItem") return [ ...filtered, new AssistantMessageItem({ id: crypto.randomUUID(), - content: event.content, + content: event.content ?? "", isHistory }) ] } case "LLMRequestInterrupted": { - const filtered = items.filter((i) => i._tag !== "InProgressAssistantItem") + const filtered = items.filter((i) => "_tag" in i && i._tag !== "InProgressAssistantItem") return [ ...filtered, new LLMInterruptionItem({ id: crypto.randomUUID(), - partialResponse: event.partialResponse, - reason: event.reason, + partialResponse: event.partialResponse ?? "", + reason: event.reason ?? "unknown", isHistory }) ] @@ -125,7 +196,7 @@ function feedReducer(items: FeedItem[], action: FeedAction): FeedItem[] { ...items, new UserMessageItem({ id: crypto.randomUUID(), - content: event.content, + content: event.content ?? "", isHistory }) ] @@ -135,14 +206,77 @@ function feedReducer(items: FeedItem[], action: FeedAction): FeedItem[] { ...items, new FileAttachmentItem({ id: crypto.randomUUID(), - source: event.source, + source: event.source ?? { type: "file", path: "" }, fileName: Option.fromNullable(event.fileName), isHistory }) ] case "SystemPrompt": + return [ + ...items, + new SystemPromptItem({ + id: crypto.randomUUID(), + content: event.content ?? "", + isHistory + }) + ] + + case "SessionStarted": + return [ + ...items, + new SessionStartedItem({ + id: crypto.randomUUID(), + isHistory + }) + ] + + case "AgentTurnStarted": + return [ + ...items, + new AgentTurnStartedItem({ + id: crypto.randomUUID(), + turnNumber: event.turnNumber ?? 0, + isHistory + }) + ] + + case "AgentTurnCompleted": + return [ + ...items, + new AgentTurnCompletedItem({ + id: crypto.randomUUID(), + turnNumber: event.turnNumber ?? 0, + durationMs: event.durationMs ?? 0, + isHistory + }) + ] + + case "AgentTurnFailed": + return [ + ...items, + new AgentTurnFailedItem({ + id: crypto.randomUUID(), + turnNumber: event.turnNumber ?? 0, + error: event.error ?? "Unknown error", + isHistory + }) + ] + case "SetLlmConfig": + return [ + ...items, + new SetLlmConfigItem({ + id: crypto.randomUUID(), + model: event.model ?? "", + provider: event.provider ?? "", + isHistory + }) + ] + + case "SessionEnded": + case "SetTimeout": + // Don't display these events in the UI return items default: @@ -150,7 +284,7 @@ function feedReducer(items: FeedItem[], action: FeedAction): FeedItem[] { ...items, new UnknownEventItem({ id: crypto.randomUUID(), - eventTag: (event as { _tag: string })._tag, + eventTag: event._tag, isHistory }) ] @@ -256,6 +390,66 @@ const FileAttachmentRenderer = memo<{ item: FileAttachmentItem }>(({ item }) => ) }) +const SystemPromptRenderer = memo<{ item: SystemPromptItem }>(({ item }) => { + const textColor = item.isHistory ? colors.dim : colors.yellow + + return ( + + ⚙️ System: {item.content.slice(0, 60)}{item.content.length > 60 ? "..." : ""} + + ) +}) + +const SessionStartedRenderer = memo<{ item: SessionStartedItem }>(({ item }) => { + const textColor = item.isHistory ? colors.dim : colors.yellow + + return ( + + 🔵 Session started + + ) +}) + +const AgentTurnStartedRenderer = memo<{ item: AgentTurnStartedItem }>(({ item }) => { + const textColor = item.isHistory ? colors.dim : colors.yellow + + return ( + + ▶️ Turn {item.turnNumber} started + + ) +}) + +const AgentTurnCompletedRenderer = memo<{ item: AgentTurnCompletedItem }>(({ item }) => { + const textColor = item.isHistory ? colors.dim : colors.yellow + + return ( + + ✅ Turn {item.turnNumber} completed ({item.durationMs}ms) + + ) +}) + +const AgentTurnFailedRenderer = memo<{ item: AgentTurnFailedItem }>(({ item }) => { + const textColor = item.isHistory ? colors.dimRed : colors.red + + return ( + + ❌ Turn {item.turnNumber} failed: {item.error} + + ) +}) + +const SetLlmConfigRenderer = memo<{ item: SetLlmConfigItem }>(({ item }) => { + const textColor = item.isHistory ? colors.dim : colors.yellow + + return ( + + 🤖 LLM: {item.provider}:{item.model} + + ) +}) + const UnknownEventRenderer = memo<{ item: UnknownEventItem }>(({ item }) => { return ( @@ -265,19 +459,35 @@ const UnknownEventRenderer = memo<{ item: UnknownEventItem }>(({ item }) => { }) const FeedItemRenderer = memo<{ item: FeedItem }>(({ item }) => { + if (!("_tag" in item)) return null + switch (item._tag) { case "UserMessageItem": - return + return case "InProgressAssistantItem": - return + return case "AssistantMessageItem": - return + return case "LLMInterruptionItem": - return + return case "FileAttachmentItem": - return + return + case "SystemPromptItem": + return + case "SessionStartedItem": + return + case "AgentTurnStartedItem": + return + case "AgentTurnCompletedItem": + return + case "AgentTurnFailedItem": + return + case "SetLlmConfigItem": + return case "UnknownEventItem": - return + return + default: + return null } }) @@ -286,6 +496,11 @@ interface FeedProps { hasHistory: boolean } +const getItemId = (item: FeedItem): string => { + if ("id" in item) return item.id as string + return crypto.randomUUID() +} + const Feed = memo(({ feedItems, hasHistory }) => { return ( @@ -298,7 +513,7 @@ const Feed = memo(({ feedItems, hasHistory }) => { )} {feedItems.map((item) => ( - + ))} ) @@ -310,17 +525,19 @@ export interface ChatCallbacks { } export interface ChatController { - addEvent: (event: ContextEvent) => void + addEvent: (event: ChatEvent) => void cleanup: () => void } interface ChatAppProps { contextName: string - initialEvents: PersistedEvent[] + initialEvents: ChatEvent[] callbacks: ChatCallbacks controllerRef: React.MutableRefObject } +const hasTag = (item: FeedItem): item is FeedItem & { _tag: string } => "_tag" in item + function ChatApp({ contextName, initialEvents, callbacks, controllerRef }: ChatAppProps) { // Derive initial feed items from history events (runs once on mount) const initialFeedItems = useMemo( @@ -340,20 +557,22 @@ function ChatApp({ contextName, initialEvents, callbacks, controllerRef }: ChatA // Check if we have any history (for separator display) const hasHistory = initialFeedItems.some( (item) => - item._tag === "UserMessageItem" || - item._tag === "AssistantMessageItem" || - item._tag === "LLMInterruptionItem" + hasTag(item) && ( + item._tag === "UserMessageItem" || + item._tag === "AssistantMessageItem" || + item._tag === "LLMInterruptionItem" + ) ) // Check if currently streaming (for input placeholder) - const isStreaming = feedItems.some((item) => item._tag === "InProgressAssistantItem") + const isStreaming = feedItems.some((item) => hasTag(item) && item._tag === "InProgressAssistantItem") const isStreamingRef = useRef(false) isStreamingRef.current = isStreaming // Set up controller synchronously during first render if (!controllerRef.current) { controllerRef.current = { - addEvent(event: ContextEvent) { + addEvent(event: ChatEvent) { dispatchRef.current({ event, isHistory: false }) }, cleanup() { @@ -421,7 +640,7 @@ function ChatApp({ contextName, initialEvents, callbacks, controllerRef }: ChatA export async function runOpenTUIChat( contextName: string, - initialEvents: PersistedEvent[], + initialEvents: ChatEvent[], callbacks: ChatCallbacks ): Promise { let exitSignaled = false @@ -462,7 +681,7 @@ export async function runOpenTUIChat( renderer.start() return { - addEvent(event: ContextEvent) { + addEvent(event: ChatEvent) { controllerRef.current?.addEvent(event) }, cleanup() { diff --git a/src/cli/main.ts b/src/cli/main.ts index 06a43a6..0c5fc10 100644 --- a/src/cli/main.ts +++ b/src/cli/main.ts @@ -7,6 +7,7 @@ import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai" import { FetchHttpClient } from "@effect/platform" import { BunContext, BunRuntime } from "@effect/platform-bun" import { Cause, Effect, Layer } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { AppConfig, extractConfigPath, @@ -15,9 +16,10 @@ import { type MiniAgentConfig as MiniAgentConfigType, resolveBaseDir } from "../config.ts" -import { ContextRepository } from "../context.repository.ts" -import { ContextService } from "../context.service.ts" +import { EventReducer } from "../event-reducer.ts" +import { EventStoreFileSystem } from "../event-store-fs.ts" import { CurrentLlmConfig, getApiKey, type LlmConfig, resolveLlmConfig } from "../llm-config.ts" +import { LlmTurnLive } from "../llm-turn.ts" import { createLoggingLayer } from "../logging.ts" import { OpenAiChatClient, OpenAiChatLanguageModel } from "../openai-chat-completions-client.ts" import { createTracingLayer } from "../tracing.ts" @@ -97,8 +99,18 @@ const makeMainLayer = (args: ReadonlyArray) => const languageModelLayer = makeLanguageModelLayer(llmConfig) const tracingLayer = createTracingLayer("mini-agent") - return ContextService.layer.pipe( - Layer.provideMerge(ContextRepository.layer), + // Build the agent registry with all dependencies + const agentRegistryLayer = AgentRegistry.Default.pipe( + Layer.provide(LlmTurnLive), + Layer.provide(languageModelLayer), + Layer.provide(llmConfigLayer), + Layer.provide(EventStoreFileSystem), + Layer.provide(EventReducer.Default), + Layer.provide(appConfigLayer), + Layer.provide(BunContext.layer) + ) + + return agentRegistryLayer.pipe( Layer.provideMerge(languageModelLayer), Layer.provideMerge(llmConfigLayer), Layer.provideMerge(tracingLayer), diff --git a/src/context.model.ts b/src/context.model.ts deleted file mode 100644 index 532faff..0000000 --- a/src/context.model.ts +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Context Event Schemas - * - * A Context is the central concept in this codebase: a named, ordered list of events. - * Events represent conversation turns between user and assistant, plus system configuration. - * - * Event Types: - * - SystemPrompt: Initial AI behavior configuration (persisted) - * - UserMessage: Input from the user (persisted) - * - AssistantMessage: Complete response from the AI (persisted) - * - TextDelta: Streaming chunk (ephemeral, never persisted) - * - SetLlmConfig: LLM configuration for this context (persisted) - */ -import { Schema } from "effect" -import { LlmConfig } from "./llm-config.ts" - -/** Branded type for context names - prevents mixing with other strings */ -export const ContextName = Schema.String.pipe(Schema.brand("ContextName")) -export type ContextName = typeof ContextName.Type - -/** Message format for LLM APIs and tracing */ -export interface LLMMessage { - readonly role: "system" | "user" | "assistant" - readonly content: string -} - -/** System prompt event - sets the AI's behavior */ -export class SystemPromptEvent extends Schema.TaggedClass()("SystemPrompt", { - content: Schema.String -}) { - toLLMMessage(): LLMMessage { - return { role: "system", content: this.content } - } -} - -/** User message event - input from the user */ -export class UserMessageEvent extends Schema.TaggedClass()("UserMessage", { - content: Schema.String -}) { - toLLMMessage(): LLMMessage { - return { role: "user", content: this.content } - } -} - -/** Assistant message event - complete response from the AI */ -export class AssistantMessageEvent extends Schema.TaggedClass()("AssistantMessage", { - content: Schema.String -}) { - toLLMMessage(): LLMMessage { - return { role: "assistant", content: this.content } - } -} - -/** Text delta event - streaming chunk (ephemeral, never persisted) */ -export class TextDeltaEvent extends Schema.TaggedClass()("TextDelta", { - delta: Schema.String -}) {} - -/** Reason for LLM request interruption */ -export const InterruptReason = Schema.Literal("user_cancel", "user_new_message", "timeout") -export type InterruptReason = typeof InterruptReason.Type - -/** Emitted when LLM request is interrupted - persisted because it contains partial response */ -export class LLMRequestInterruptedEvent - extends Schema.TaggedClass()("LLMRequestInterrupted", { - requestId: Schema.String, - reason: InterruptReason, - partialResponse: Schema.String - }) -{ - toLLMMessage(): LLMMessage { - return { role: "assistant", content: this.partialResponse } - } -} - -/** Attachment source - local file path or remote URL */ -export const AttachmentSource = Schema.Union( - Schema.Struct({ type: Schema.Literal("file"), path: Schema.String }), - Schema.Struct({ type: Schema.Literal("url"), url: Schema.String }) -) -export type AttachmentSource = typeof AttachmentSource.Type - -/** File attachment event - image or other file shared with AI */ -export class FileAttachmentEvent extends Schema.TaggedClass()( - "FileAttachment", - { - source: AttachmentSource, - mediaType: Schema.String, - fileName: Schema.optional(Schema.String) - } -) {} - -/** Sets the LLM config for this context. Added when context is created. */ -export class SetLlmConfigEvent extends Schema.TaggedClass()( - "SetLlmConfig", - { config: LlmConfig } -) {} - -/** Events that get persisted to the context file */ -export const PersistedEvent = Schema.Union( - SystemPromptEvent, - UserMessageEvent, - AssistantMessageEvent, - LLMRequestInterruptedEvent, - FileAttachmentEvent, - SetLlmConfigEvent -) -export type PersistedEvent = typeof PersistedEvent.Type - -/** All possible context events (persisted + ephemeral) */ -export const ContextEvent = Schema.Union( - SystemPromptEvent, - UserMessageEvent, - AssistantMessageEvent, - LLMRequestInterruptedEvent, - FileAttachmentEvent, - SetLlmConfigEvent, - TextDeltaEvent -) -export type ContextEvent = typeof ContextEvent.Type - -/** Input events that can be added via addEvents */ -export const InputEvent = Schema.Union(UserMessageEvent, FileAttachmentEvent, SystemPromptEvent) -export type InputEvent = typeof InputEvent.Type - -export const DEFAULT_SYSTEM_PROMPT = `You are a helpful, friendly assistant. -Keep your responses concise but informative. -Use markdown formatting when helpful.` diff --git a/src/context.repository.ts b/src/context.repository.ts deleted file mode 100644 index d4fbe4c..0000000 --- a/src/context.repository.ts +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Context Repository - * - * Handles file I/O for context persistence. Contexts are stored as YAML files - * in the configured data storage directory. - */ -import { FileSystem, Path } from "@effect/platform" -import { Context, Effect, Layer, Option, Schema } from "effect" -import * as YAML from "yaml" -import { AppConfig } from "./config.ts" -import { - ContextName, - DEFAULT_SYSTEM_PROMPT, - PersistedEvent, - type PersistedEvent as PersistedEventType, - SystemPromptEvent -} from "./context.model.ts" -import { ContextLoadError, ContextSaveError } from "./errors.ts" - -/** - * Decode a plain object to a PersistedEvent class instance. - * This ensures the event has all the methods defined on the class. - */ -const decodeEvent = Schema.decodeUnknownSync(PersistedEvent) - -/** - * Decode an array of plain objects to PersistedEvent class instances. - */ -const decodeEvents = (rawEvents: Array): Array => rawEvents.map((raw) => decodeEvent(raw)) - -/** - * Encode an event to a plain object for YAML serialization. - */ -const encodeEvent = Schema.encodeSync(PersistedEvent) - -export class ContextRepository extends Context.Tag("@app/ContextRepository")< - ContextRepository, - { - readonly load: (contextName: string) => Effect.Effect, ContextLoadError> - readonly loadOrCreate: ( - contextName: string - ) => Effect.Effect, ContextLoadError | ContextSaveError> - readonly save: ( - contextName: string, - events: ReadonlyArray - ) => Effect.Effect - /** Append events to existing context (load + save atomically) */ - readonly append: ( - contextName: string, - events: ReadonlyArray - ) => Effect.Effect - readonly list: () => Effect.Effect, ContextLoadError> - readonly getContextsDir: () => string - } ->() { - /** - * Production layer with file system persistence. - */ - static readonly layer = Layer.effect( - ContextRepository, - Effect.gen(function*() { - const fs = yield* FileSystem.FileSystem - const path = yield* Path.Path - const config = yield* AppConfig - - // Resolve the contexts directory from config - const cwd = Option.getOrElse(config.cwd, () => process.cwd()) - const contextsDir = path.join(cwd, config.dataStorageDir, "contexts") - - const getContextPath = (contextName: string) => path.join(contextsDir, `${contextName}.yaml`) - - // Service methods wrapped with Effect.fn for call-site tracing - // See: https://www.effect.solutions/services-and-layers - - /** - * Save events to a context file. - */ - const save = Effect.fn("ContextRepository.save")( - function*(contextName: string, events: ReadonlyArray) { - const filePath = getContextPath(contextName) - - // Ensure directory exists - yield* fs.makeDirectory(contextsDir, { recursive: true }).pipe( - Effect.catchAll(() => Effect.void) - ) - - // Convert to plain objects for YAML serialization using Schema encoding - const plainEvents = events.map((e) => encodeEvent(e)) - - const yaml = YAML.stringify({ events: plainEvents }) - yield* fs.writeFileString(filePath, yaml).pipe( - Effect.catchAll((error) => - new ContextSaveError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - } - ) - - /** - * Load events from a context file. - * Returns empty array if context doesn't exist. - */ - const load = Effect.fn("ContextRepository.load")( - function*(contextName: string) { - const filePath = getContextPath(contextName) - const exists = yield* fs.exists(filePath).pipe( - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - - if (!exists) { - return [] as Array - } - - return yield* fs.readFileString(filePath).pipe( - Effect.map((yaml) => { - const parsed = YAML.parse(yaml) as { events?: Array } - return decodeEvents(parsed?.events ?? []) - }), - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - } - ) - - /** - * Load events from a context, creating it with default system prompt if it doesn't exist. - */ - const loadOrCreate = Effect.fn("ContextRepository.loadOrCreate")( - function*(contextName: string) { - const filePath = getContextPath(contextName) - const exists = yield* fs.exists(filePath).pipe( - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - - if (!exists) { - const initialEvents = [new SystemPromptEvent({ content: DEFAULT_SYSTEM_PROMPT })] - yield* save(contextName, initialEvents) - return initialEvents - } - - return yield* fs.readFileString(filePath).pipe( - Effect.map((yaml) => { - const parsed = YAML.parse(yaml) as { events?: Array } - return decodeEvents(parsed?.events ?? []) - }), - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - } - ) - - /** - * Append events to an existing context. - * Loads current events, appends new ones, and saves atomically. - */ - const append = Effect.fn("ContextRepository.append")( - function*(contextName: string, newEvents: ReadonlyArray) { - const existing = yield* load(contextName) - const combined = [...existing, ...newEvents] - yield* save(contextName, combined) - } - ) - - /** - * List all existing context names, sorted by most recently modified first. - */ - const list = Effect.fn("ContextRepository.list")( - function*() { - const exists = yield* fs.exists(contextsDir).pipe( - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(""), - cause: error - }) - ) - ) - if (!exists) return [] as Array - - const entries = yield* fs.readDirectory(contextsDir).pipe( - Effect.map((names) => names.filter((name) => name.endsWith(".yaml"))), - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(""), - cause: error - }) - ) - ) - - // Get modification times for each file - const entriesWithTimes = yield* Effect.all( - entries.map((name) => - fs.stat(path.join(contextsDir, name)).pipe( - Effect.map((stat) => ({ - name: name.replace(/\.yaml$/, ""), - mtime: Option.getOrElse(stat.mtime, () => new Date(0)) - })), - Effect.catchAll(() => Effect.succeed({ name: name.replace(/\.yaml$/, ""), mtime: new Date(0) })) - ) - ) - ) - - // Sort by modification time, most recent first - return entriesWithTimes - .sort((a, b) => b.mtime.getTime() - a.mtime.getTime()) - .map((entry) => entry.name) - } - ) - - return ContextRepository.of({ - load, - loadOrCreate, - save, - append, - list, - getContextsDir: () => contextsDir - }) - }) - ) - - /** - * Test layer with in-memory storage for unit tests. - * See: https://www.effect.solutions/testing - */ - static readonly testLayer = Layer.sync(ContextRepository, () => { - const store = new Map>() - - return ContextRepository.of({ - load: (contextName: string) => Effect.succeed(store.get(contextName) ?? []), - loadOrCreate: (contextName: string) => - Effect.sync(() => { - const existing = store.get(contextName) - if (existing) return existing - const initial = [new SystemPromptEvent({ content: DEFAULT_SYSTEM_PROMPT })] - store.set(contextName, initial) - return initial - }), - save: (contextName: string, events: ReadonlyArray) => - Effect.sync(() => void store.set(contextName, [...events])), - append: (contextName: string, newEvents: ReadonlyArray) => - Effect.sync(() => { - const existing = store.get(contextName) ?? [] - store.set(contextName, [...existing, ...newEvents]) - }), - list: () => Effect.sync(() => Array.from(store.keys()).sort()), - getContextsDir: () => "/test/contexts" - }) - }) -} diff --git a/src/context.service.ts b/src/context.service.ts deleted file mode 100644 index 6669d59..0000000 --- a/src/context.service.ts +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Context Service - * - * The main domain service for working with Contexts. - * - * A Context is a named, ordered list of events representing a conversation. - * The only supported operation is `addEvents`: - * 1. Appends input events (typically UserMessage) to the context - * 2. Triggers an LLM request with the full event history - * 3. Streams back new events (TextDelta ephemeral, AssistantMessage persisted) - * 4. Persists the new events to the context file - */ -import type { AiError, LanguageModel } from "@effect/ai" -import type { Error as PlatformError, FileSystem } from "@effect/platform" -import { Context, Effect, Layer, pipe, Schema, Stream } from "effect" -import { - AssistantMessageEvent, - type ContextEvent, - DEFAULT_SYSTEM_PROMPT, - type InputEvent, - PersistedEvent, - type PersistedEvent as PersistedEventType, - SetLlmConfigEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "./context.model.ts" -import { ContextRepository } from "./context.repository.ts" -import type { ContextLoadError, ContextSaveError } from "./errors.ts" -import { CurrentLlmConfig, LlmConfig } from "./llm-config.ts" -import { streamLLMResponse } from "./llm.ts" - -// ============================================================================= -// Context Service -// ============================================================================= - -export class ContextService extends Context.Tag("@app/ContextService")< - ContextService, - { - /** - * Add events to a context, triggering LLM processing if UserMessage present. - * - * This is the core operation on a Context: - * 1. Loads existing events (or creates context with system prompt) - * 2. Appends the input events (UserMessage and/or FileAttachment) - * 3. Runs LLM with full history (only if UserMessage present) - * 4. Streams back TextDelta (ephemeral) and AssistantMessage (persisted) - * 5. Persists new events as they complete - */ - readonly addEvents: ( - contextName: string, - inputEvents: ReadonlyArray - ) => Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > - - /** Load all events from a context. */ - readonly load: (contextName: string) => Effect.Effect, ContextLoadError> - - /** List all context names. */ - readonly list: () => Effect.Effect, ContextLoadError> - - /** Persist an event directly (e.g., LLMRequestInterruptedEvent on cancel). */ - readonly persistEvent: ( - contextName: string, - event: PersistedEventType - ) => Effect.Effect - } ->() { - /** - * Production layer with file system persistence and LLM integration. - */ - static readonly layer = Layer.effect( - ContextService, - Effect.gen(function*() { - const repo = yield* ContextRepository - - // Service methods wrapped with Effect.fn for call-site tracing - // See: https://www.effect.solutions/services-and-layers - - const addEvents = ( - contextName: string, - inputEvents: ReadonlyArray - ): Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > => { - // Check if any UserMessage is present (triggers LLM) - const hasUserMessage = inputEvents.some(Schema.is(UserMessageEvent)) - - return pipe( - // Load or create context, append input events - Effect.fn("ContextService.addEvents.prepare")(function*() { - const llmConfig = yield* CurrentLlmConfig - - // Check if context exists before loading/creating - const existingEvents = yield* repo.load(contextName) - const isNewContext = existingEvents.length === 0 - - // If new context, create with system prompt and LLM config - const baseEvents = isNewContext - ? [ - new SystemPromptEvent({ content: DEFAULT_SYSTEM_PROMPT }), - new SetLlmConfigEvent({ config: llmConfig }) - ] - : existingEvents - - const newPersistedInputs = inputEvents.filter(Schema.is(PersistedEvent)) as Array - - if (isNewContext || newPersistedInputs.length > 0) { - const allEvents = [...baseEvents, ...newPersistedInputs] - yield* repo.save(contextName, allEvents) - return allEvents - } - return baseEvents - })(), - // Only stream LLM response if there's a UserMessage - Effect.andThen((events) => hasUserMessage ? streamLLMResponse(events) : Stream.empty), - Stream.unwrap, - // Persist events as they complete (only persisted ones) - Stream.tap((event) => - Schema.is(PersistedEvent)(event) - ? Effect.gen(function*() { - const current = yield* repo.load(contextName) - yield* repo.save(contextName, [...current, event]) - }) - : Effect.void - ) - ) - } - - const load = Effect.fn("ContextService.load")( - function*(contextName: string) { - return yield* repo.load(contextName) - } - ) - - const list = Effect.fn("ContextService.list")( - function*() { - return yield* repo.list() - } - ) - - const persistEvent = Effect.fn("ContextService.persistEvent")( - function*(contextName: string, event: PersistedEventType) { - const current = yield* repo.load(contextName) - yield* repo.save(contextName, [...current, event]) - } - ) - - return ContextService.of({ - addEvents, - load, - list, - persistEvent - }) - }) - ) - - /** - * Test layer with mock LLM responses for unit tests. - * See: https://www.effect.solutions/testing - */ - static readonly testLayer = Layer.sync(ContextService, () => { - // In-memory store for test contexts - const store = new Map>() - - // Mock LLM config for tests - const mockLlmConfig = new LlmConfig({ - apiFormat: "openai-responses", - model: "test-model", - baseUrl: "https://api.test.com", - apiKeyEnvVar: "TEST_API_KEY" - }) - - return ContextService.of({ - addEvents: ( - contextName: string, - inputEvents: ReadonlyArray - ): Stream.Stream => { - // Load or create context - let events = store.get(contextName) - if (!events) { - events = [ - new SystemPromptEvent({ content: "Test system prompt" }), - new SetLlmConfigEvent({ config: mockLlmConfig }) - ] - store.set(contextName, events) - } - - // Add input events - const newPersistedInputs = inputEvents.filter(Schema.is(PersistedEvent)) as Array - if (newPersistedInputs.length > 0) { - events = [...events, ...newPersistedInputs] - store.set(contextName, events) - } - - // Check if any UserMessage is present - const hasUserMessage = inputEvents.some(Schema.is(UserMessageEvent)) - - // Only generate mock LLM response if there's a UserMessage - if (!hasUserMessage) { - return Stream.empty - } - - // Mock LLM response stream - const mockResponse = "This is a mock response for testing." - const assistantEvent = new AssistantMessageEvent({ content: mockResponse }) - - return Stream.make( - new TextDeltaEvent({ delta: mockResponse }), - assistantEvent - ).pipe( - Stream.tap((event) => - Schema.is(PersistedEvent)(event) - ? Effect.sync(() => { - const current = store.get(contextName) ?? [] - store.set(contextName, [...current, event]) - }) - : Effect.void - ) - ) - }, - - load: (contextName: string) => Effect.succeed(store.get(contextName) ?? []), - - list: () => Effect.sync(() => Array.from(store.keys()).sort()), - - persistEvent: (contextName: string, event: PersistedEventType) => - Effect.sync(() => { - const current = store.get(contextName) ?? [] - store.set(contextName, [...current, event]) - }) - }) - }) -} diff --git a/src/new-architecture/domain.ts b/src/domain.ts similarity index 86% rename from src/new-architecture/domain.ts rename to src/domain.ts index 8fd4e8b..4ce8c2a 100644 --- a/src/new-architecture/domain.ts +++ b/src/domain.ts @@ -164,6 +164,27 @@ export class AgentTurnFailedEvent extends Schema.TaggedClass()( + "FileAttachmentEvent", + { + ...BaseEventFields, + source: AttachmentSource, + mediaType: Schema.String, + fileName: Schema.optionalWith(Schema.String, { as: "Option" }) + } +) {} + // ----------------------------------------------------------------------------- // Event Union // ----------------------------------------------------------------------------- @@ -180,7 +201,8 @@ export const ContextEvent = Schema.Union( AgentTurnStartedEvent, AgentTurnCompletedEvent, AgentTurnInterruptedEvent, - AgentTurnFailedEvent + AgentTurnFailedEvent, + FileAttachmentEvent ) export type ContextEvent = typeof ContextEvent.Type @@ -385,5 +407,40 @@ export const EventBuilder = { ...EventBuilder.makeBase(agentName, contextName, nextEventNumber, false, parentEventId), turnNumber, durationMs + }), + + textDelta: ( + agentName: AgentName, + contextName: ContextName, + nextEventNumber: number, + delta: string, + parentEventId: Option.Option = Option.none() + ) => + new TextDeltaEvent({ + ...EventBuilder.makeBase(agentName, contextName, nextEventNumber, false, parentEventId), + delta + }), + + fileAttachment: ( + agentName: AgentName, + contextName: ContextName, + nextEventNumber: number, + source: AttachmentSource, + mediaType: string, + fileName: Option.Option = Option.none() + ) => + new FileAttachmentEvent({ + ...EventBuilder.makeBase(agentName, contextName, nextEventNumber, false), + source, + mediaType, + fileName }) } + +// ----------------------------------------------------------------------------- +// Default System Prompt +// ----------------------------------------------------------------------------- + +export const DEFAULT_SYSTEM_PROMPT = `You are a helpful, friendly assistant. +Keep your responses concise but informative. +Use markdown formatting when helpful.` diff --git a/src/errors.ts b/src/errors.ts index fe08d68..644916f 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,14 +1,16 @@ /** * Domain Error Types * - * Uses Schema.TaggedError for serializable, type-safe error handling. - * See: https://www.effect.solutions/error-handling + * Re-exports error types from domain.ts for backward compatibility. */ import { Schema } from "effect" -import { ContextName } from "./context.model.ts" +import { ContextName } from "./domain.ts" + +// Re-export errors from domain +export { AgentError, AgentNotFoundError, ContextLoadError, ContextSaveError, ReducerError } from "./domain.ts" // ============================================================================= -// Context Errors +// Legacy Error Types (for backward compatibility) // ============================================================================= /** Error when a context is not found */ @@ -17,30 +19,8 @@ export class ContextNotFound extends Schema.TaggedError()( { name: ContextName } ) {} -/** Error when loading a context fails */ -export class ContextLoadError extends Schema.TaggedError()( - "ContextLoadError", - { - name: ContextName, - cause: Schema.Defect - } -) {} - -/** Error when saving a context fails */ -export class ContextSaveError extends Schema.TaggedError()( - "ContextSaveError", - { - name: ContextName, - cause: Schema.Defect - } -) {} - /** Union of all context-related errors */ -export const ContextError = Schema.Union( - ContextNotFound, - ContextLoadError, - ContextSaveError -) +export const ContextError = Schema.Union(ContextNotFound) export type ContextError = typeof ContextError.Type // ============================================================================= diff --git a/src/new-architecture/event-reducer.ts b/src/event-reducer.ts similarity index 74% rename from src/new-architecture/event-reducer.ts rename to src/event-reducer.ts index d90f0c2..e57070f 100644 --- a/src/new-architecture/event-reducer.ts +++ b/src/event-reducer.ts @@ -14,6 +14,7 @@ import { AgentConfig, type ContextEvent, defaultAgentConfig, + InterruptReason, LlmProviderConfig, type ReducedContext, ReducerError @@ -32,6 +33,15 @@ export class EventReducer extends Effect.Service()("@mini-agent/Ev agentTurnStartedAtEventId: Option.none() } + const interruptionReasonDescriptions: Record = { + user_cancel: "user cancelled the response", + user_new_message: "user interrupted to send a new message", + timeout: "the response timed out" + } + + const interruptionExplanation = (reason: InterruptReason): string => + `The previous assistant response was interrupted (${interruptionReasonDescriptions[reason]}). Please continue from where it stopped.` + const reduceOne = ( ctx: ReducedContext, event: ContextEvent @@ -76,6 +86,11 @@ export class EventReducer extends Effect.Service()("@mini-agent/Ev return { ...ctx, nextEventNumber } } + case "FileAttachmentEvent": { + // File attachments are handled during LLM request, not stored in messages + return { ...ctx, nextEventNumber } + } + case "SetLlmConfigEvent": { const providerConfig = new LlmProviderConfig({ providerId: event.providerId, @@ -131,9 +146,31 @@ export class EventReducer extends Effect.Service()("@mini-agent/Ev } } - case "AgentTurnInterruptedEvent": + case "AgentTurnInterruptedEvent": { + const partialMessages = Option.match(event.partialResponse, { + onNone: () => [] as ReadonlyArray, + onSome: (partial) => + [ + Prompt.assistantMessage({ + content: [Prompt.textPart({ text: partial })] + }) + ] as ReadonlyArray + }) + + const explanationMessage = Prompt.userMessage({ + content: [Prompt.textPart({ text: interruptionExplanation(event.reason) })] + }) + + return { + ...ctx, + messages: [...ctx.messages, ...partialMessages, explanationMessage], + agentTurnStartedAtEventId: Option.none(), + nextEventNumber + } + } + case "AgentTurnFailedEvent": { - // Turn ended (failed or interrupted) - clear in-progress state + // Turn failed - clear in-progress state return { ...ctx, agentTurnStartedAtEventId: Option.none(), diff --git a/src/new-architecture/event-store-fs.ts b/src/event-store-fs.ts similarity index 99% rename from src/new-architecture/event-store-fs.ts rename to src/event-store-fs.ts index 1f61100..1e34be7 100644 --- a/src/new-architecture/event-store-fs.ts +++ b/src/event-store-fs.ts @@ -9,7 +9,7 @@ import { FileSystem, Path } from "@effect/platform" import { Deferred, Effect, Layer, Option, Queue, Ref, Schema } from "effect" import * as YAML from "yaml" -import { AppConfig } from "../config.ts" +import { AppConfig } from "./config.ts" import { ContextEvent, ContextLoadError, type ContextName, ContextSaveError } from "./domain.ts" import { EventStore } from "./event-store.ts" @@ -32,7 +32,7 @@ export const EventStoreFileSystem: Layer.Layer< const config = yield* AppConfig const cwd = Option.getOrElse(config.cwd, () => process.cwd()) - const contextsDir = path.join(cwd, config.dataStorageDir, "contexts-v2") + const contextsDir = path.join(cwd, config.dataStorageDir, "contexts") const getContextPath = (contextName: ContextName) => path.join(contextsDir, `${contextName}.yaml`) diff --git a/src/new-architecture/event-store.ts b/src/event-store.ts similarity index 97% rename from src/new-architecture/event-store.ts rename to src/event-store.ts index 2406b05..e25357e 100644 --- a/src/new-architecture/event-store.ts +++ b/src/event-store.ts @@ -3,7 +3,7 @@ * * Pluggable implementations: * - InMemory: For tests (fresh state per layer creation) - * - Future: Yaml files, Postgres, Redis + * - FileSystem: For production (YAML files) */ import { Effect, Layer } from "effect" diff --git a/src/http.ts b/src/http.ts index 0deb697..26dd3ca 100644 --- a/src/http.ts +++ b/src/http.ts @@ -1,95 +1,86 @@ /** * HTTP Server for Agent * - * Provides HTTP endpoints that mirror the CLI interface: - * - POST /context/:contextName - Send events, receive SSE stream of responses + * Endpoints: + * - POST /agent/:agentName - Send message, receive SSE stream of events + * - GET /health - Health check */ -import { LanguageModel } from "@effect/ai" -import { FileSystem, HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" -import { Effect, Schema, Stream } from "effect" -import type { ContextEvent } from "./context.model.ts" -import { CurrentLlmConfig } from "./llm-config.ts" -import { AgentServer, ScriptInputEvent } from "./server.service.ts" + +import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" +import { Effect, Fiber, Runtime, Schema, Stream } from "effect" +import { AgentRegistry } from "./agent-registry.ts" +import { type AgentName, type ContextEvent, EventBuilder } from "./domain.ts" /** Encode a ContextEvent as an SSE data line */ const encodeSSE = (event: ContextEvent): Uint8Array => new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`) -/** Error for JSONL parsing failures */ -class JsonParseError extends Error { - readonly _tag = "JsonParseError" - constructor(readonly line: number, readonly rawLine: string, readonly originalError: unknown) { - super( - `Invalid JSON at line ${line}: ${originalError instanceof Error ? originalError.message : String(originalError)}` - ) - } -} +/** Input message schema */ +const InputMessage = Schema.Struct({ + _tag: Schema.Literal("UserMessage"), + content: Schema.String +}) +type InputMessage = typeof InputMessage.Type -/** Parse JSONL body into ScriptInputEvents */ -const parseJsonlBody = (body: string) => +/** Parse JSON body into InputMessage */ +const parseBody = (body: string) => Effect.gen(function*() { - const lines = body.split("\n").filter((line) => line.trim() !== "") - const events: Array = [] - - for (const [i, line] of lines.entries()) { - const json = yield* Effect.try({ - try: () => JSON.parse(line) as unknown, - catch: (e) => new JsonParseError(i + 1, line, e) - }) - const event = yield* Schema.decodeUnknown(ScriptInputEvent)(json) - events.push(event) - } - - return events + const json = yield* Effect.try({ + try: () => JSON.parse(body) as unknown, + catch: (e) => new Error(`Invalid JSON: ${e instanceof Error ? e.message : String(e)}`) + }) + return yield* Schema.decodeUnknown(InputMessage)(json) }) -/** Handler for POST /context/:contextName */ -const contextHandler = Effect.gen(function*() { +/** Handler for POST /agent/:agentName */ +const agentHandler = Effect.gen(function*() { const request = yield* HttpServerRequest.HttpServerRequest - const agentServer = yield* AgentServer + const registry = yield* AgentRegistry const params = yield* HttpRouter.params - // Get context services to provide to the stream - const langModel = yield* LanguageModel.LanguageModel - const fs = yield* FileSystem.FileSystem - const llmConfig = yield* CurrentLlmConfig + // Capture the runtime to use in the Stream.async callback + const runtime = yield* Effect.runtime() - const contextName = params.contextName - if (!contextName) { - return HttpServerResponse.text("Missing contextName", { status: 400 }) + const agentName = params.agentName + if (!agentName) { + return HttpServerResponse.text("Missing agentName", { status: 400 }) } - yield* Effect.logDebug("POST /context/:contextName", { contextName }) + yield* Effect.logDebug("POST /agent/:agentName", { agentName }) - // Read body as text and parse JSONL + // Read body const body = yield* request.text if (body.trim() === "") { return HttpServerResponse.text("Empty request body", { status: 400 }) } - const parseResult = yield* parseJsonlBody(body).pipe(Effect.either) + const parseResult = yield* parseBody(body).pipe(Effect.either) if (parseResult._tag === "Left") { - const error = parseResult.left - const message = error instanceof JsonParseError - ? error.message - : `Invalid event format: ${error instanceof Error ? error.message : String(error)}` - return HttpServerResponse.text(message, { status: 400 }) + return HttpServerResponse.text(parseResult.left.message, { status: 400 }) } - const events = parseResult.right - if (events.length === 0) { - return HttpServerResponse.text("No valid events in body", { status: 400 }) - } + const message = parseResult.right + + // Get or create agent + const agent = yield* registry.getOrCreate(agentName as AgentName) - // Stream SSE events directly - provide services to remove context requirements - const sseStream = agentServer.handleRequest(contextName, events).pipe( - Stream.map(encodeSSE), - Stream.provideService(LanguageModel.LanguageModel, langModel), - Stream.provideService(FileSystem.FileSystem, fs), - Stream.provideService(CurrentLlmConfig, llmConfig) + // Get historical events (includes SessionStartedEvent, any prior events) + const historicalEvents = yield* agent.getEvents + + const ctx = yield* agent.getReducedContext + + // Prepare user event + const userEvent = EventBuilder.userMessage( + agentName as AgentName, + agent.contextName, + ctx.nextEventNumber, + message.content ) + // Create SSE stream using the captured runtime + const sseStream = makeSseStream(runtime, agent, historicalEvents, userEvent) + return HttpServerResponse.stream(sseStream, { contentType: "text/event-stream", headers: { @@ -99,20 +90,58 @@ const contextHandler = Effect.gen(function*() { }) }) +/** Create SSE stream that emits all events (historical + new) until turn completes */ +const makeSseStream = ( + runtime: Runtime.Runtime, + agent: { + events: Stream.Stream + addEvent: (e: ContextEvent) => Effect.Effect + }, + historicalEvents: ReadonlyArray, + userEvent: ContextEvent +): Stream.Stream => + Stream.async((emit) => { + const runStream = Effect.gen(function*() { + // First emit all historical events (SessionStartedEvent, any prior events) + for (const event of historicalEvents) { + emit.single(encodeSSE(event)) + } + + // Fork the event subscription to receive new events + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((e) => + Effect.sync(() => { + emit.single(encodeSSE(e)) + }) + ), + Stream.runDrain, + Effect.fork + ) + + // Small delay to ensure the subscription fiber is actually consuming + yield* Effect.sleep("10 millis") + + // Add user event to agent (it will be broadcast to the subscriber) + yield* agent.addEvent(userEvent) + + // Wait for stream to complete + yield* Fiber.join(streamFiber) + emit.end() + }) + + // Run with the captured runtime + Runtime.runFork(runtime)(runStream) + }) + /** Health check endpoint */ const healthHandler = Effect.gen(function*() { yield* Effect.logDebug("GET /health") return yield* HttpServerResponse.json({ status: "ok" }) }) -/** Create the HTTP router - context requirements will be inferred */ +/** HTTP router */ export const makeRouter = HttpRouter.empty.pipe( - HttpRouter.post("/context/:contextName", contextHandler), + HttpRouter.post("/agent/:agentName", agentHandler), HttpRouter.get("/health", healthHandler) ) - -/** Run the server and log the address - for standalone use */ -export const runServer = Effect.gen(function*() { - yield* Effect.log("Server started") - return yield* Effect.never -}) diff --git a/src/layercode/cli.ts b/src/layercode/cli.ts index e434987..b7074c6 100644 --- a/src/layercode/cli.ts +++ b/src/layercode/cli.ts @@ -10,7 +10,6 @@ import { BunHttpServer } from "@effect/platform-bun" import { Console, Effect, Layer, Option, Stream } from "effect" import { AppConfig } from "../config.ts" import { makeRouter } from "../http.ts" -import { AgentServer } from "../server.service.ts" import { makeLayerCodeRouter } from "./layercode.adapter.ts" const portOption = Options.integer("port").pipe( @@ -147,11 +146,6 @@ const layercodeServeCommand = CliCommand.make( // Create server layer with configured port/host const serverLayer = BunHttpServer.layer({ port: actualPort, hostname: actualHost }) - const layers = Layer.mergeAll( - serverLayer, - AgentServer.layer - ) - // Start the tunnel if enabled (fork it to run concurrently with server) if (tunnelEnabled && Option.isSome(agentId)) { yield* startTunnel(agentId.value, actualPort).pipe(Effect.fork) @@ -160,7 +154,7 @@ const layercodeServeCommand = CliCommand.make( // Use Layer.launch to keep the server running (blocks forever) return yield* Layer.launch( HttpServer.serve(combinedRouter).pipe( - Layer.provide(layers) + Layer.provide(serverLayer) ) ) }) diff --git a/src/layercode/layercode.adapter.ts b/src/layercode/layercode.adapter.ts index 71de0e6..35dff2b 100644 --- a/src/layercode/layercode.adapter.ts +++ b/src/layercode/layercode.adapter.ts @@ -6,21 +6,13 @@ * LayerCode sends events like: * { "type": "message", "text": "hello", "session_id": "abc", "turn_id": "123" } * - * We translate to: - * { "_tag": "UserMessage", "content": "hello" } - * - * And translate our responses back: - * { "_tag": "TextDelta", "delta": "Hi" } - * → - * data: {"type":"response.tts","content":"Hi","turn_id":"123"} + * We translate to UserMessageEvent and stream back responses. */ -import { LanguageModel } from "@effect/ai" -import { FileSystem, HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" -import { Effect, Option, Schema, Stream } from "effect" +import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" +import { Effect, Fiber, Option, Schema, Stream } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { AppConfig } from "../config.ts" -import { AssistantMessageEvent, type ContextEvent, TextDeltaEvent, UserMessageEvent } from "../context.model.ts" -import { CurrentLlmConfig } from "../llm-config.ts" -import { AgentServer } from "../server.service.ts" +import { type AgentName, type ContextEvent, EventBuilder } from "../domain.ts" import { maybeVerifySignature } from "./signature.ts" /** LayerCode incoming webhook event types */ @@ -78,7 +70,7 @@ interface LayerCodeEndResponse { type LayerCodeResponse = LayerCodeTTSResponse | LayerCodeEndResponse /** Convert context name from session_id */ -const sessionToContextName = (sessionId: string): string => `layercode-session-${sessionId}` +const sessionToAgentName = (sessionId: string): AgentName => `layercode-session-${sessionId}` as AgentName /** Encode LayerCode response as SSE */ const encodeLayerCodeSSE = (response: LayerCodeResponse): Uint8Array => @@ -89,7 +81,7 @@ const toLayerCodeResponse = ( event: ContextEvent, turnId: string ): LayerCodeResponse | null => { - if (Schema.is(TextDeltaEvent)(event)) { + if (event._tag === "TextDeltaEvent") { return { type: "response.tts", content: event.delta, @@ -97,7 +89,7 @@ const toLayerCodeResponse = ( } } - if (Schema.is(AssistantMessageEvent)(event)) { + if (event._tag === "AssistantMessageEvent") { return { type: "response.end", turn_id: turnId @@ -112,14 +104,9 @@ const toLayerCodeResponse = ( const layercodeWebhookHandler = (welcomeMessage: Option.Option) => Effect.gen(function*() { const request = yield* HttpServerRequest.HttpServerRequest - const agentServer = yield* AgentServer + const registry = yield* AgentRegistry const config = yield* AppConfig - // Get context services to provide to the stream - const langModel = yield* LanguageModel.LanguageModel - const fs = yield* FileSystem.FileSystem - const llmConfig = yield* CurrentLlmConfig - yield* Effect.logDebug("POST /layercode/webhook") // Read body @@ -160,22 +147,47 @@ const layercodeWebhookHandler = (welcomeMessage: Option.Option) => // Handle different event types switch (webhookEvent.type) { case "message": { - const contextName = sessionToContextName(webhookEvent.session_id) + const agentName = sessionToAgentName(webhookEvent.session_id) const turnId = webhookEvent.turn_id - // Convert to our format - const userMessage = new UserMessageEvent({ content: webhookEvent.text }) - - // Stream SSE events directly - provide services to remove context requirements - const sseStream = agentServer.handleRequest(contextName, [userMessage]).pipe( - Stream.map((event) => toLayerCodeResponse(event, turnId)), - Stream.filter((r): r is LayerCodeResponse => r !== null), - Stream.map(encodeLayerCodeSSE), - Stream.provideService(LanguageModel.LanguageModel, langModel), - Stream.provideService(FileSystem.FileSystem, fs), - Stream.provideService(CurrentLlmConfig, llmConfig) + // Get or create agent + const agent = yield* registry.getOrCreate(agentName) + const ctx = yield* agent.getReducedContext + + // Create user message event + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + ctx.nextEventNumber, + webhookEvent.text ) + // Stream SSE events + const sseStream = Stream.async((emit) => { + const runStream = Effect.gen(function*() { + // Subscribe to events before adding the user message + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((event) => + Effect.sync(() => { + const response = toLayerCodeResponse(event, turnId) + if (response) { + emit.single(encodeLayerCodeSSE(response)) + } + }) + ), + Stream.runDrain, + Effect.fork + ) + + yield* agent.addEvent(userEvent) + yield* Fiber.join(streamFiber) + emit.end() + }) + + Effect.runPromise(runStream).catch(() => emit.end()) + }) + return HttpServerResponse.stream(sseStream, { contentType: "text/event-stream", headers: { @@ -279,11 +291,7 @@ export const makeLayerCodeRouter = ( welcomeMessage: Option.Option ): HttpRouter.HttpRouter< never, - | AgentServer - | AppConfig - | LanguageModel.LanguageModel - | FileSystem.FileSystem - | CurrentLlmConfig + AgentRegistry | AppConfig > => HttpRouter.empty.pipe( HttpRouter.post("/layercode/webhook", layercodeWebhookHandler(welcomeMessage)) diff --git a/src/new-architecture/llm-turn.ts b/src/llm-turn.ts similarity index 98% rename from src/new-architecture/llm-turn.ts rename to src/llm-turn.ts index 5178c9f..e6803c4 100644 --- a/src/new-architecture/llm-turn.ts +++ b/src/llm-turn.ts @@ -7,7 +7,6 @@ import { type AiError, LanguageModel, Prompt } from "@effect/ai" import { DateTime, Effect, Layer, Option, pipe, Ref, Stream } from "effect" -import { CurrentLlmConfig } from "../llm-config.ts" import { AgentError, type AgentName, @@ -19,6 +18,7 @@ import { type ReducedContext, TextDeltaEvent } from "./domain.ts" +import { CurrentLlmConfig } from "./llm-config.ts" /** * Convert ReducedContext messages to @effect/ai Prompt. diff --git a/src/llm.ts b/src/llm.ts deleted file mode 100644 index 0cb69b2..0000000 --- a/src/llm.ts +++ /dev/null @@ -1,178 +0,0 @@ -/** - * LLM Request - * - * Pure function that takes persisted events and produces a stream of context events. - */ -import { type AiError, LanguageModel, Prompt } from "@effect/ai" -import { type Error as PlatformError, FileSystem } from "@effect/platform" -import { Clock, Effect, Option, pipe, Ref, Schema, Stream } from "effect" -import { - AssistantMessageEvent, - type ContextEvent, - FileAttachmentEvent, - LLMRequestInterruptedEvent, - type PersistedEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "./context.model.ts" -import { CurrentLlmConfig } from "./llm-config.ts" - -// ============================================================================= -// Event to Prompt Conversion -// ============================================================================= - -const isSystem = Schema.is(SystemPromptEvent) -const isAssistant = Schema.is(AssistantMessageEvent) -const isUser = Schema.is(UserMessageEvent) -const isFile = Schema.is(FileAttachmentEvent) -const isInterrupted = Schema.is(LLMRequestInterruptedEvent) - -/** - * Groups consecutive user events (messages + attachments) into single multi-part messages. - * File attachments are read at call time, not persisted as base64. - * @internal Exported for testing - */ -export const eventsToPrompt = ( - events: ReadonlyArray -): Effect.Effect => - Effect.gen(function*() { - const fs = yield* FileSystem.FileSystem - const messages: Array = [] - - let i = 0 - while (i < events.length) { - const event = events[i]! - - if (isSystem(event)) { - messages.push(Prompt.makeMessage("system", { content: event.content })) - i++ - } else if (isAssistant(event)) { - messages.push( - Prompt.makeMessage("assistant", { - content: [Prompt.makePart("text", { text: event.content })] - }) - ) - i++ - } else if (isInterrupted(event)) { - // Only include if there was actual content before interruption - if (event.partialResponse.trim()) { - // Include partial response as assistant message - messages.push( - Prompt.makeMessage("assistant", { - content: [Prompt.makePart("text", { text: event.partialResponse })] - }) - ) - // Add user message explaining the interruption - messages.push( - Prompt.makeMessage("user", { - content: [Prompt.makePart("text", { - text: - `Your previous response was interrupted by the user. Here is what you said until that point:\n\n${event.partialResponse}` - })] - }) - ) - } - i++ - } else if (isUser(event) || isFile(event)) { - // Consecutive user/file events become a single multi-part user message - const userParts: Array = [] - - while (i < events.length) { - const e = events[i]! - if (isFile(e)) { - if (e.source.type === "file") { - const bytes = yield* fs.readFile(e.source.path) - userParts.push( - Prompt.makePart("file", { - mediaType: e.mediaType, - data: bytes, - fileName: e.fileName - }) - ) - } else { - userParts.push( - Prompt.makePart("file", { - mediaType: e.mediaType, - data: new URL(e.source.url), - fileName: e.fileName - }) - ) - } - i++ - } else if (isUser(e)) { - userParts.push(Prompt.makePart("text", { text: e.content })) - i++ - } else { - break - } - } - - if (userParts.length > 0) { - messages.push(Prompt.makeMessage("user", { content: userParts })) - } - } else { - i++ - } - } - - return Prompt.make(messages) - }) - -/** - * Stream an LLM response from persisted conversation events. - * - * Takes the full conversation history and produces: - * 1. TextDelta events for each streaming chunk (ephemeral) - * 2. A final AssistantMessage event with the complete response (persisted) - */ -export const streamLLMResponse = ( - events: ReadonlyArray -): Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig -> => - Stream.unwrap( - Effect.fn("LLM.streamLLMResponse")(function*() { - const model = yield* LanguageModel.LanguageModel - const llmConfig = yield* CurrentLlmConfig - const fullResponseRef = yield* Ref.make("") - const firstTokenReceivedRef = yield* Ref.make(false) - - yield* Effect.annotateCurrentSpan("gen_ai.base_url", llmConfig.baseUrl) - - yield* Effect.logDebug(`Streaming LLM response`, { - model: llmConfig.model, - apiFormat: llmConfig.apiFormat - }) - - const prompt = yield* eventsToPrompt(events) - const startTime = yield* Clock.currentTimeMillis - - return pipe( - model.streamText({ prompt }), - Stream.filterMap((part) => part.type === "text-delta" ? Option.some(part.delta) : Option.none()), - Stream.mapEffect((delta) => - Effect.gen(function*() { - const alreadyRecorded = yield* Ref.get(firstTokenReceivedRef) - if (!alreadyRecorded) { - yield* Ref.set(firstTokenReceivedRef, true) - const now = yield* Clock.currentTimeMillis - const ttft = now - startTime - yield* Effect.annotateCurrentSpan("gen_ai.time_to_first_token_ms", ttft) - } - yield* Ref.update(fullResponseRef, (t) => t + delta) - return new TextDeltaEvent({ delta }) as ContextEvent - }) - ), - Stream.concat( - Stream.fromEffect( - Ref.get(fullResponseRef).pipe( - Effect.map((content) => new AssistantMessageEvent({ content }) as ContextEvent) - ) - ) - ) - ) - })() - ) diff --git a/src/new-architecture/mini-agent.ts b/src/mini-agent.ts similarity index 99% rename from src/new-architecture/mini-agent.ts rename to src/mini-agent.ts index af075ea..8994918 100644 --- a/src/new-architecture/mini-agent.ts +++ b/src/mini-agent.ts @@ -178,7 +178,7 @@ export const makeMiniAgent = ( }) ), Stream.runDrain, - Effect.fork + Effect.forkDaemon ) // Helper: Add event to state and broadcast (via queue for serialization) @@ -228,7 +228,7 @@ export const makeMiniAgent = ( ) ), Stream.runDrain, - Effect.fork + Effect.forkDaemon ) // Track if shutdown has been called to prevent duplicate cleanup diff --git a/src/new-architecture/agent-registry.test.ts b/src/new-architecture/agent-registry.test.ts deleted file mode 100644 index 40e03e1..0000000 --- a/src/new-architecture/agent-registry.test.ts +++ /dev/null @@ -1,217 +0,0 @@ -/** - * AgentRegistry Tests - * - * Tests the agent lifecycle management: - * - Creating agents on demand (getOrCreate) - * - Caching agents by name - * - Listing active agents - * - Shutting down individual agents - * - Shutting down all agents - */ -import { describe, expect, it } from "@effect/vitest" -import { Effect, Layer, Stream } from "effect" -import { AgentRegistry } from "./agent-registry.ts" -import { type AgentName, type ContextEvent, MiniAgentTurn, type ReducedContext } from "./domain.ts" -import { EventReducer } from "./event-reducer.ts" -import { EventStore } from "./event-store.ts" - -// Mock MiniAgentTurn -const MockTurn = Layer.sync(MiniAgentTurn, () => - ({ - execute: (_ctx: ReducedContext) => Stream.empty as Stream.Stream - }) as unknown as MiniAgentTurn) - -// Test layer with all dependencies -// AgentRegistry.Default needs EventStore, EventReducer, and MiniAgentTurn -const TestLayer = AgentRegistry.Default.pipe( - Layer.provide(MockTurn), - Layer.provide(EventStore.InMemory), - Layer.provide(EventReducer.Default) -) - -describe("AgentRegistry", () => { - describe("getOrCreate", () => { - it.effect("creates new agent for unknown name", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - const agent = yield* registry.getOrCreate("new-agent" as AgentName) - - expect(agent.agentName).toBe("new-agent") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("returns same agent for same name", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - - const agent1 = yield* registry.getOrCreate("cached" as AgentName) - const agent2 = yield* registry.getOrCreate("cached" as AgentName) - - // Same instance - expect(agent1.agentName).toBe(agent2.agentName) - expect(agent1.contextName).toBe(agent2.contextName) - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("creates different agents for different names", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - - const agent1 = yield* registry.getOrCreate("agent-a" as AgentName) - const agent2 = yield* registry.getOrCreate("agent-b" as AgentName) - - expect(agent1.agentName).not.toBe(agent2.agentName) - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("assigns contextName based on agentName", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - const agent = yield* registry.getOrCreate("my-agent" as AgentName) - - // Context name should contain agent name - expect(agent.contextName.includes("my-agent")).toBe(true) - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - }) - - describe("get", () => { - it.effect("fails for non-existent agent", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - const result = yield* registry.get("non-existent" as AgentName).pipe( - Effect.either - ) - - expect(result._tag).toBe("Left") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("returns agent after getOrCreate", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - - yield* registry.getOrCreate("exists" as AgentName) - const agent = yield* registry.get("exists" as AgentName) - - expect(agent.agentName).toBe("exists") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - }) - - describe("list", () => { - it.effect("returns empty array when no agents", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - const agents = yield* registry.list - - expect(agents).toEqual([]) - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("returns all created agents", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - - yield* registry.getOrCreate("agent-1" as AgentName) - yield* registry.getOrCreate("agent-2" as AgentName) - yield* registry.getOrCreate("agent-3" as AgentName) - - const agents = yield* registry.list - - expect(agents.length).toBe(3) - expect(agents).toContain("agent-1") - expect(agents).toContain("agent-2") - expect(agents).toContain("agent-3") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - }) - - describe("shutdownAgent", () => { - it.effect("fails for non-existent agent", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - const result = yield* registry.shutdownAgent("non-existent" as AgentName).pipe( - Effect.either - ) - - expect(result._tag).toBe("Left") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("removes agent from registry", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - - yield* registry.getOrCreate("to-remove" as AgentName) - yield* registry.shutdownAgent("to-remove" as AgentName) - - const agents = yield* registry.list - expect(agents).not.toContain("to-remove") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - }) - - describe("shutdownAll", () => { - it.effect("clears all agents", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - - yield* registry.getOrCreate("agent-1" as AgentName) - yield* registry.getOrCreate("agent-2" as AgentName) - yield* registry.shutdownAll - - const agents = yield* registry.list - expect(agents).toEqual([]) - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - }) - - describe("test isolation", () => { - it.effect("first test creates agent", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - yield* registry.getOrCreate("isolation" as AgentName) - - const agents = yield* registry.list - expect(agents).toContain("isolation") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - - it.effect("second test has fresh registry", () => - Effect.gen(function*() { - const registry = yield* AgentRegistry - const agents = yield* registry.list - - // Should not see agent from previous test - expect(agents).not.toContain("isolation") - }).pipe( - Effect.scoped, - Effect.provide(TestLayer) - )) - }) -}) diff --git a/src/new-architecture/cli.e2e.test.ts b/src/new-architecture/cli.e2e.test.ts deleted file mode 100644 index 7937d17..0000000 --- a/src/new-architecture/cli.e2e.test.ts +++ /dev/null @@ -1,168 +0,0 @@ -/** - * CLI E2E Tests for new architecture. - * - * Tests the new actor-based CLI functionality. - * Uses mock LLM server by default, or real LLM with USE_REAL_LLM=1. - */ -import { Command } from "@effect/platform" -import { BunContext } from "@effect/platform-bun" -import { Effect, Stream } from "effect" -import * as fs from "node:fs" -import * as path from "node:path" -import { describe } from "vitest" -import { expect, test, useRealLlm } from "../../test/fixtures.js" - -const CLI_PATH = path.resolve(__dirname, "./cli.ts") - -interface CliResult { - stdout: string - stderr: string - exitCode: number -} - -/** Run CLI with full control over env and output capture */ -const runCli = ( - args: Array, - options: { cwd?: string; env?: Record } = {} -): Effect.Effect => { - // Inherit all parent env vars (including API keys from Doppler) - const env = { - ...process.env, - ...options.env - } - - // Build command - use workingDirectory instead of --cwd arg - let cmd = Command.make("bun", CLI_PATH, ...args) - if (options.cwd) cmd = Command.workingDirectory(cmd, options.cwd) - cmd = Command.env(cmd, env) - - return Effect.scoped( - Effect.gen(function*() { - const proc = yield* Command.start(cmd) - const [stdout, stderr, exitCode] = yield* Effect.all([ - proc.stdout.pipe(Stream.decodeText(), Stream.mkString), - proc.stderr.pipe(Stream.decodeText(), Stream.mkString), - proc.exitCode - ]) - return { stdout, stderr, exitCode } - }) - ).pipe( - Effect.provide(BunContext.layer), - Effect.catchAll((e) => Effect.succeed({ stdout: "", stderr: String(e), exitCode: 1 })) - ) -} - -/** Extract JSON objects from CLI response */ -const extractJsonOutput = (output: string): string => { - const jsonObjects: Array = [] - let depth = 0 - let start = -1 - - for (let i = 0; i < output.length; i++) { - if (output[i] === "{") { - if (depth === 0) start = i - depth++ - } else if (output[i] === "}") { - depth-- - if (depth === 0 && start !== -1) { - const obj = output.slice(start, i + 1) - if (obj.includes("\"_tag\"")) { - jsonObjects.push(obj) - } - start = -1 - } - } - } - - return jsonObjects.join("\n") -} - -describe("New Architecture CLI", () => { - describe("--help", () => { - test("shows help message", async () => { - const result = await Effect.runPromise(runCli(["--help"])) - expect(result.stdout).toContain("mini-agent-v2") - expect(result.stdout).toContain("chat") - }) - - test("shows chat help", async () => { - const result = await Effect.runPromise(runCli(["chat", "--help"])) - expect(result.stdout).toContain("--name") - expect(result.stdout).toContain("--message") - expect(result.stdout).toContain("--raw") - }) - }) - - describe("chat command", () => { - // Skip LLM tests when not using real LLM - mock server integration needs work - test.skipIf(!useRealLlm)("sends a message and gets a response", { timeout: 60000 }, async ({ testDir }) => { - const result = await Effect.runPromise( - runCli(["chat", "-n", "test-context", "-m", "Say exactly: TEST_V2_RESPONSE"], { cwd: testDir }) - ) - - expect(result.stdout.length).toBeGreaterThan(0) - expect(result.exitCode).toBe(0) - }) - - test.skipIf(!useRealLlm)("outputs JSON in raw mode", { timeout: 60000 }, async ({ testDir }) => { - const result = await Effect.runPromise( - runCli(["chat", "-n", "raw-test", "-m", "Say exactly: RAW_TEST", "--raw"], { cwd: testDir }) - ) - - expect(result.exitCode).toBe(0) - const jsonOutput = extractJsonOutput(result.stdout) - expect(jsonOutput).toContain("\"_tag\"") - expect(jsonOutput).toContain("\"AssistantMessageEvent\"") - }) - - test.skipIf(!useRealLlm)("creates context file", { timeout: 60000 }, async ({ testDir }) => { - await Effect.runPromise( - runCli(["chat", "-n", "persist-test", "-m", "Hello"], { cwd: testDir }) - ) - - // Context file should exist in v2 contexts dir - const contextsDir = path.join(testDir, ".mini-agent", "contexts-v2") - expect(fs.existsSync(contextsDir)).toBe(true) - - const files = fs.readdirSync(contextsDir) - expect(files.some((f) => f.includes("persist-test"))).toBe(true) - }) - - test.skipIf(!useRealLlm)("maintains conversation history", { timeout: 90000 }, async ({ testDir }) => { - // First message - await Effect.runPromise( - runCli(["chat", "-n", "history-test", "-m", "Remember: my secret code is ABC123"], { cwd: testDir }) - ) - - // Second message - const result = await Effect.runPromise( - runCli(["chat", "-n", "history-test", "-m", "What is my secret code?", "--raw"], { cwd: testDir }) - ) - - expect(result.exitCode).toBe(0) - expect(result.stdout.toLowerCase()).toContain("abc123") - }) - }) -}) - -describe("Multi-LLM", () => { - const llms = [ - { llm: "openai:gpt-4.1-mini" }, - { llm: "anthropic:claude-haiku-4-5" } - ] as const - - // Skip multi-LLM tests when not using real LLM - describe.skipIf(!useRealLlm).each(llms)("LLM: $llm", ({ llm }) => { - test( - "basic chat works", - { timeout: 60000 }, - async ({ testDir }) => { - const result = await Effect.runPromise( - runCli(["chat", "-n", "llm-test", "-m", "Say exactly: SUCCESS"], { cwd: testDir, env: { LLM: llm } }) - ) - expect(result.stdout.length).toBeGreaterThan(0) - expect(result.exitCode).toBe(0) - } - ) - }) -}) diff --git a/src/new-architecture/cli.ts b/src/new-architecture/cli.ts deleted file mode 100644 index fb26598..0000000 --- a/src/new-architecture/cli.ts +++ /dev/null @@ -1,195 +0,0 @@ -/** - * CLI wrapper for new architecture. - * - * Simple CLI entry point demonstrating the actor-based architecture. - * Usage: bun run src/new-architecture/cli.ts chat -n -m - */ - -import { AnthropicClient, AnthropicLanguageModel } from "@effect/ai-anthropic" -import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google" -import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai" -import { Command, Options } from "@effect/cli" -import { FetchHttpClient, Terminal } from "@effect/platform" -import { BunContext, BunRuntime } from "@effect/platform-bun" -import { ConfigProvider, Effect, Fiber, Layer, LogLevel, Option, Stream } from "effect" -import { AppConfig, type MiniAgentConfig } from "../config.ts" -import { CurrentLlmConfig, getApiKey, type LlmConfig, resolveLlmConfig } from "../llm-config.ts" -import { createLoggingLayer } from "../logging.ts" -import { OpenAiChatClient, OpenAiChatLanguageModel } from "../openai-chat-completions-client.ts" -import { AgentRegistry } from "./agent-registry.ts" -import { type AgentName, EventBuilder } from "./domain.ts" -import { EventReducer } from "./event-reducer.ts" -import { EventStoreFileSystem } from "./event-store-fs.ts" -import { LlmTurnLive } from "./llm-turn.ts" - -const makeLanguageModelLayer = (llmConfig: LlmConfig) => { - const apiKey = getApiKey(llmConfig) - - switch (llmConfig.apiFormat) { - case "openai-responses": - return OpenAiLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "openai-chat-completions": - return OpenAiChatLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiChatClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "anthropic": - return AnthropicLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - AnthropicClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "gemini": - return GoogleLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - GoogleClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - } -} - -// CLI Options -const nameOption = Options.text("name").pipe( - Options.withAlias("n"), - Options.withDescription("Context/agent name"), - Options.withDefault("default") -) - -const messageOption = Options.text("message").pipe( - Options.withAlias("m"), - Options.withDescription("Message to send") -) - -const rawOption = Options.boolean("raw").pipe( - Options.withAlias("r"), - Options.withDescription("Output raw JSON events"), - Options.withDefault(false) -) - -const cwdOption = Options.directory("cwd").pipe( - Options.withDescription("Working directory"), - Options.optional -) - -// Chat command -const chatCommand = Command.make( - "chat", - { name: nameOption, message: messageOption, raw: rawOption }, - ({ message, name, raw }) => - Effect.gen(function*() { - const terminal = yield* Terminal.Terminal - const registry = yield* AgentRegistry - - const agentName = name as AgentName - const agent = yield* registry.getOrCreate(agentName) - - // Get current context to know event number - const ctx = yield* agent.getReducedContext - - // Subscribe to events BEFORE adding the user message - // Fork the stream consumption so it runs in parallel - const streamFiber = yield* agent.events.pipe( - Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), - Stream.tap((event) => { - if (raw) { - return terminal.display(JSON.stringify(event) + "\n") - } else if (event._tag === "TextDeltaEvent") { - return terminal.display(event.delta) - } else if (event._tag === "AssistantMessageEvent") { - return terminal.display("\n") - } - return Effect.void - }), - Stream.runDrain, - Effect.fork - ) - - // Add user message with triggersAgentTurn=true - const userEvent = EventBuilder.userMessage( - agentName, - agent.contextName, - ctx.nextEventNumber, - message - ) - yield* agent.addEvent(userEvent) - - // Wait for stream to complete - yield* Fiber.join(streamFiber) - }) -) - -// Root command -const cli = Command.make("mini-agent-v2", { cwd: cwdOption }).pipe( - Command.withSubcommands([chatCommand]) -) - -const cliApp = Command.run(cli, { - name: "mini-agent-v2", - version: "2.0.0" -}) - -// Default config for CLI -const defaultConfig: MiniAgentConfig = { - llm: "openai:gpt-4o-mini", - dataStorageDir: ".mini-agent", - configFile: "mini-agent.config.yaml", - cwd: Option.none(), - stdoutLogLevel: LogLevel.Warning, - fileLogLevel: LogLevel.Debug, - port: 3000, - host: "0.0.0.0", - layercodeWebhookSecret: Option.none() -} - -const appConfigLayer = Layer.succeed(AppConfig, defaultConfig) - -const program = Effect.gen(function*() { - const llmConfig = yield* resolveLlmConfig.pipe(Effect.withConfigProvider(ConfigProvider.fromEnv())) - yield* Effect.logDebug("Using LLM config", { provider: llmConfig.apiFormat, model: llmConfig.model }) - - const languageModelLayer = makeLanguageModelLayer(llmConfig) - const llmConfigLayer = CurrentLlmConfig.fromConfig(llmConfig) - - // Build the full layer stack - // AgentRegistry.Default requires EventStore, EventReducer, and MiniAgentTurn - const fullLayer = AgentRegistry.Default.pipe( - Layer.provide(LlmTurnLive), - Layer.provide(languageModelLayer), - Layer.provide(llmConfigLayer), - Layer.provide(EventStoreFileSystem), - Layer.provide(EventReducer.Default), - Layer.provide(appConfigLayer), - Layer.provide(BunContext.layer) - ) - - yield* cliApp(process.argv).pipe(Effect.provide(fullLayer)) -}) - -const loggingLayer = createLoggingLayer({ - stdoutLogLevel: LogLevel.Warning, - fileLogLevel: LogLevel.Debug, - baseDir: ".mini-agent" -}) - -const mainLayer = Layer.mergeAll(loggingLayer, BunContext.layer) - -program.pipe( - Effect.provide(mainLayer), - BunRuntime.runMain -) diff --git a/src/new-architecture/event-store.test.ts b/src/new-architecture/event-store.test.ts deleted file mode 100644 index 8ca7111..0000000 --- a/src/new-architecture/event-store.test.ts +++ /dev/null @@ -1,133 +0,0 @@ -/** - * EventStore Tests - * - * Tests the event persistence layer. - * Uses InMemory implementation for unit tests. - */ -import { describe, expect, it } from "@effect/vitest" -import { DateTime, Effect, Option } from "effect" -import type { AgentName, ContextName } from "./domain.ts" -import { makeEventId, UserMessageEvent } from "./domain.ts" -import { EventStore } from "./event-store.ts" - -const testAgentName = "test-agent" as AgentName -const testContextName = "test-context" as ContextName - -const makeTestEvent = (eventNumber: number, content: string) => - new UserMessageEvent({ - id: makeEventId(testContextName, eventNumber), - timestamp: DateTime.unsafeNow(), - agentName: testAgentName, - parentEventId: Option.none(), - triggersAgentTurn: true, - content - }) - -describe("EventStore", () => { - describe("load", () => { - it.effect("returns empty array for non-existent context", () => - Effect.gen(function*() { - const store = yield* EventStore - const events = yield* store.load("non-existent" as ContextName) - expect(events).toEqual([]) - }).pipe(Effect.provide(EventStore.InMemory))) - - it.effect("returns saved events after append", () => - Effect.gen(function*() { - const store = yield* EventStore - const events = [makeTestEvent(0, "Hello")] - - yield* store.append(testContextName, events) - const loaded = yield* store.load(testContextName) - - expect(loaded).toHaveLength(1) - expect(loaded[0]?._tag).toBe("UserMessageEvent") - }).pipe(Effect.provide(EventStore.InMemory))) - }) - - describe("append", () => { - it.effect("appends events to existing context", () => - Effect.gen(function*() { - const store = yield* EventStore - const firstBatch = [makeTestEvent(0, "First")] - const secondBatch = [makeTestEvent(1, "Second")] - - yield* store.append(testContextName, firstBatch) - yield* store.append(testContextName, secondBatch) - - const loaded = yield* store.load(testContextName) - expect(loaded).toHaveLength(2) - }).pipe(Effect.provide(EventStore.InMemory))) - - it.effect("appends multiple events at once", () => - Effect.gen(function*() { - const store = yield* EventStore - const events = [ - makeTestEvent(0, "First"), - makeTestEvent(1, "Second"), - makeTestEvent(2, "Third") - ] - - yield* store.append(testContextName, events) - const loaded = yield* store.load(testContextName) - - expect(loaded).toHaveLength(3) - }).pipe(Effect.provide(EventStore.InMemory))) - }) - - describe("exists", () => { - it.effect("returns false for non-existent context", () => - Effect.gen(function*() { - const store = yield* EventStore - const exists = yield* store.exists("non-existent" as ContextName) - expect(exists).toBe(false) - }).pipe(Effect.provide(EventStore.InMemory))) - - it.effect("returns true after events are appended", () => - Effect.gen(function*() { - const store = yield* EventStore - yield* store.append(testContextName, [makeTestEvent(0, "Hello")]) - - const exists = yield* store.exists(testContextName) - expect(exists).toBe(true) - }).pipe(Effect.provide(EventStore.InMemory))) - }) - - describe("isolation", () => { - it.effect("different contexts are independent", () => - Effect.gen(function*() { - const store = yield* EventStore - const context1 = "context-1" as ContextName - const context2 = "context-2" as ContextName - - yield* store.append(context1, [makeTestEvent(0, "Context 1")]) - yield* store.append(context2, [makeTestEvent(0, "Context 2")]) - - const loaded1 = yield* store.load(context1) - const loaded2 = yield* store.load(context2) - - expect(loaded1).toHaveLength(1) - expect(loaded2).toHaveLength(1) - expect((loaded1[0] as UserMessageEvent).content).toBe("Context 1") - expect((loaded2[0] as UserMessageEvent).content).toBe("Context 2") - }).pipe(Effect.provide(EventStore.InMemory))) - }) - - describe("test isolation", () => { - it.effect("first test creates data", () => - Effect.gen(function*() { - const store = yield* EventStore - yield* store.append(testContextName, [makeTestEvent(0, "Isolation test")]) - - const loaded = yield* store.load(testContextName) - expect(loaded).toHaveLength(1) - }).pipe(Effect.provide(EventStore.InMemory))) - - it.effect("second test has fresh state", () => - Effect.gen(function*() { - const store = yield* EventStore - const loaded = yield* store.load(testContextName) - expect(loaded).toEqual([]) - }).pipe(Effect.provide(EventStore.InMemory))) - }) -}) diff --git a/src/new-architecture/http.ts b/src/new-architecture/http.ts deleted file mode 100644 index 1913b5e..0000000 --- a/src/new-architecture/http.ts +++ /dev/null @@ -1,112 +0,0 @@ -/** - * HTTP Server for new architecture. - * - * Endpoints: - * - POST /agent/:agentName - Send message, receive SSE stream of events - * - GET /health - Health check - */ - -import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" -import { Effect, Schema, Stream } from "effect" -import { AgentRegistry } from "./agent-registry.ts" -import { type AgentName, type ContextEvent, EventBuilder } from "./domain.ts" - -/** Encode a ContextEvent as an SSE data line */ -const encodeSSE = (event: ContextEvent): Uint8Array => new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`) - -/** Input message schema */ -const InputMessage = Schema.Struct({ - _tag: Schema.Literal("UserMessage"), - content: Schema.String -}) -type InputMessage = typeof InputMessage.Type - -/** Parse JSON body into InputMessage */ -const parseBody = (body: string) => - Effect.gen(function*() { - const json = yield* Effect.try({ - try: () => JSON.parse(body) as unknown, - catch: (e) => new Error(`Invalid JSON: ${e instanceof Error ? e.message : String(e)}`) - }) - return yield* Schema.decodeUnknown(InputMessage)(json) - }) - -/** Handler for POST /agent/:agentName */ -const agentHandler = Effect.gen(function*() { - const request = yield* HttpServerRequest.HttpServerRequest - const registry = yield* AgentRegistry - const params = yield* HttpRouter.params - - const agentName = params.agentName - if (!agentName) { - return HttpServerResponse.text("Missing agentName", { status: 400 }) - } - - yield* Effect.logDebug("POST /agent/:agentName", { agentName }) - - // Read body - const body = yield* request.text - - if (body.trim() === "") { - return HttpServerResponse.text("Empty request body", { status: 400 }) - } - - const parseResult = yield* parseBody(body).pipe(Effect.either) - - if (parseResult._tag === "Left") { - return HttpServerResponse.text(parseResult.left.message, { status: 400 }) - } - - const message = parseResult.right - - // Get or create agent - const agent = yield* registry.getOrCreate(agentName as AgentName) - const ctx = yield* agent.getReducedContext - - // Prepare user event - const userEvent = EventBuilder.userMessage( - agentName as AgentName, - agent.contextName, - ctx.nextEventNumber, - message.content - ) - - // Create an SSE stream that: - // 1. First emits the user event (so client sees it immediately) - // 2. Adds the event to the agent (which triggers the LLM turn) - // 3. Then streams all subsequent events until turn completes - const sseStream = Stream.concat( - // Emit user event immediately to client, then add it to agent - Stream.fromEffect( - agent.addEvent(userEvent).pipe( - Effect.as(encodeSSE(userEvent)), - Effect.catchAll(() => Effect.succeed(encodeSSE(userEvent))) - ) - ), - // Stream remaining events (the broadcast will include events after UserMessage) - agent.events.pipe( - Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), - Stream.map(encodeSSE) - ) - ) - - return HttpServerResponse.stream(sseStream, { - contentType: "text/event-stream", - headers: { - "Cache-Control": "no-cache", - "Connection": "keep-alive" - } - }) -}) - -/** Health check endpoint */ -const healthHandler = Effect.gen(function*() { - yield* Effect.logDebug("GET /health") - return yield* HttpServerResponse.json({ status: "ok" }) -}) - -/** HTTP router for new architecture */ -export const makeRouterV2 = HttpRouter.empty.pipe( - HttpRouter.post("/agent/:agentName", agentHandler), - HttpRouter.get("/health", healthHandler) -) diff --git a/src/new-architecture/index.ts b/src/new-architecture/index.ts deleted file mode 100644 index 8f16265..0000000 --- a/src/new-architecture/index.ts +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Mini-Agent Actor Architecture - * - * Event-sourced actor system for LLM interactions. - * Philosophy: "Agent events are all you need" - * - * @module - */ - -// Domain types and events -export { - // Config types - AgentConfig, - // Error types - AgentError, - // Branded types - AgentName, - AgentNotFoundError, - // Event types - AgentTurnCompletedEvent, - AgentTurnFailedEvent, - AgentTurnInterruptedEvent, - AgentTurnNumber, - AgentTurnStartedEvent, - AssistantMessageEvent, - ContextEvent, - ContextLoadError, - ContextName, - ContextSaveError, - defaultAgentConfig, - // Utilities - EventBuilder, - EventId, - InterruptReason, - LlmProviderConfig, - LlmProviderId, - makeEventId, - // Service types - type MiniAgent, - MiniAgentTurn, - // State types - type ReducedContext, - ReducedContext as ReducedContextHelpers, - ReducerError, - SessionEndedEvent, - SessionStartedEvent, - SetLlmConfigEvent, - SetTimeoutEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "./domain.ts" - -// Services -export { AgentRegistry } from "./agent-registry.ts" -export { EventReducer } from "./event-reducer.ts" -export { EventStore } from "./event-store.ts" - -// Agent factory -export { makeMiniAgent } from "./mini-agent.ts" - -// Production layers -export { EventStoreFileSystem } from "./event-store-fs.ts" -export { LlmTurnLive } from "./llm-turn.ts" diff --git a/src/new-architecture/server.e2e.test.ts b/src/new-architecture/server.e2e.test.ts deleted file mode 100644 index b025ca4..0000000 --- a/src/new-architecture/server.e2e.test.ts +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Server E2E Tests for new architecture. - * - * Tests the HTTP server functionality. - * Uses mock LLM server by default, or real LLM with USE_REAL_LLM=1. - */ -import { spawn } from "child_process" -import * as path from "node:path" -import { describe } from "vitest" -import { expect, test, useRealLlm } from "../../test/fixtures.js" - -const SERVER_PATH = path.resolve(__dirname, "./server.ts") - -// Port counter to avoid conflicts -let portCounter = 5000 + Math.floor(Math.random() * 1000) -const getNextPort = () => portCounter++ - -/** Start the server in background */ -const startServer = async ( - cwd: string -): Promise<{ port: number; cleanup: () => Promise }> => { - const port = getNextPort() - - const proc = spawn("bun", [SERVER_PATH, "--port", String(port)], { - cwd, - env: { - ...process.env - }, - stdio: "ignore" - }) - - const cleanup = async () => { - if (!proc.killed) { - proc.kill("SIGTERM") - await new Promise((resolve) => { - const timeout = setTimeout(() => { - proc.kill("SIGKILL") - resolve() - }, 2000) - proc.on("exit", () => { - clearTimeout(timeout) - resolve() - }) - }) - } - } - - // Wait for server to be ready - for (let i = 0; i < 100; i++) { - try { - const res = await fetch(`http://localhost:${port}/health`) - if (res.ok) { - await new Promise((resolve) => setTimeout(resolve, 50)) - return { port, cleanup } - } - } catch { - // Server not ready yet - } - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - await cleanup() - throw new Error(`Server failed to start on port ${port}`) -} - -/** Parse SSE stream from response */ -const parseSSE = async (response: Response): Promise> => { - const text = await response.text() - return text - .split("\n") - .filter((line) => line.startsWith("data: ")) - .map((line) => line.slice(6)) -} - -describe("New Architecture Server", () => { - test("health endpoint returns ok", { timeout: 30000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/health`) - const body = (await response.json()) as { status: string } - - expect(response.status).toBe(200) - expect(body.status).toBe("ok") - } finally { - await cleanup() - } - }) - - // Skip LLM tests when not using real LLM - mock server integration needs work - test.skipIf(!useRealLlm)("agent endpoint processes message", { timeout: 60000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/agent/test-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ _tag: "UserMessage", content: "Say exactly: HELLO_V2" }) - }) - - expect(response.status).toBe(200) - expect(response.headers.get("content-type")).toContain("text/event-stream") - - const events = await parseSSE(response) - expect(events.length).toBeGreaterThan(0) - - // Should have AssistantMessageEvent - const hasAssistant = events.some((e) => e.includes("\"AssistantMessageEvent\"")) - expect(hasAssistant).toBe(true) - } finally { - await cleanup() - } - }) - - test("agent endpoint returns 400 for empty body", { timeout: 30000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/agent/test-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: "" - }) - - expect(response.status).toBe(400) - } finally { - await cleanup() - } - }) - - test("agent endpoint returns 400 for invalid JSON", { timeout: 30000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/agent/test-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: "not json" - }) - - expect(response.status).toBe(400) - } finally { - await cleanup() - } - }) - - test.skipIf(!useRealLlm)( - "maintains conversation history across requests", - { timeout: 90000 }, - async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - // First message - await fetch(`http://localhost:${port}/agent/history-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ _tag: "UserMessage", content: "Remember: my code is XYZ789" }) - }) - - // Second message - const response = await fetch(`http://localhost:${port}/agent/history-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ _tag: "UserMessage", content: "What is my code?" }) - }) - - const events = await parseSSE(response) - const fullResponse = events.join("") - - expect(fullResponse.toLowerCase()).toContain("xyz789") - } finally { - await cleanup() - } - } - ) -}) diff --git a/src/new-architecture/server.ts b/src/new-architecture/server.ts deleted file mode 100644 index 8ebadde..0000000 --- a/src/new-architecture/server.ts +++ /dev/null @@ -1,129 +0,0 @@ -/** - * HTTP Server entry point for new architecture. - * - * Usage: bun run src/new-architecture/server.ts [--port PORT] - */ - -import { AnthropicClient, AnthropicLanguageModel } from "@effect/ai-anthropic" -import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google" -import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai" -import { FetchHttpClient, HttpServer } from "@effect/platform" -import { BunContext, BunHttpServer, BunRuntime } from "@effect/platform-bun" -import { ConfigProvider, Effect, Layer, LogLevel, Option } from "effect" -import { AppConfig, type MiniAgentConfig } from "../config.ts" -import { CurrentLlmConfig, getApiKey, type LlmConfig, resolveLlmConfig } from "../llm-config.ts" -import { createLoggingLayer } from "../logging.ts" -import { OpenAiChatClient, OpenAiChatLanguageModel } from "../openai-chat-completions-client.ts" -import { AgentRegistry } from "./agent-registry.ts" -import { EventReducer } from "./event-reducer.ts" -import { EventStoreFileSystem } from "./event-store-fs.ts" -import { makeRouterV2 } from "./http.ts" -import { LlmTurnLive } from "./llm-turn.ts" - -const makeLanguageModelLayer = (llmConfig: LlmConfig) => { - const apiKey = getApiKey(llmConfig) - - switch (llmConfig.apiFormat) { - case "openai-responses": - return OpenAiLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "openai-chat-completions": - return OpenAiChatLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiChatClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "anthropic": - return AnthropicLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - AnthropicClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "gemini": - return GoogleLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - GoogleClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - } -} - -// Parse port from args -const port = (() => { - const portIdx = process.argv.indexOf("--port") - if (portIdx !== -1 && process.argv[portIdx + 1]) { - return parseInt(process.argv[portIdx + 1]!, 10) - } - return 3001 -})() - -// Default config for server -const defaultConfig: MiniAgentConfig = { - llm: "openai:gpt-4o-mini", - dataStorageDir: ".mini-agent", - configFile: "mini-agent.config.yaml", - cwd: Option.none(), - stdoutLogLevel: LogLevel.Warning, - fileLogLevel: LogLevel.Debug, - port, - host: "0.0.0.0", - layercodeWebhookSecret: Option.none() -} - -const appConfigLayer = Layer.succeed(AppConfig, defaultConfig) - -const program = Effect.gen(function*() { - const llmConfig = yield* resolveLlmConfig.pipe(Effect.withConfigProvider(ConfigProvider.fromEnv())) - yield* Effect.log(`Starting server on port ${port}`) - yield* Effect.logDebug("Using LLM config", { provider: llmConfig.apiFormat, model: llmConfig.model }) - - const languageModelLayer = makeLanguageModelLayer(llmConfig) - const llmConfigLayer = CurrentLlmConfig.fromConfig(llmConfig) - - // Build the full layer stack - // AgentRegistry.Default requires EventStore, EventReducer, and MiniAgentTurn - const serviceLayer = AgentRegistry.Default.pipe( - Layer.provide(LlmTurnLive), - Layer.provide(languageModelLayer), - Layer.provide(llmConfigLayer), - Layer.provide(EventStoreFileSystem), - Layer.provide(EventReducer.Default), - Layer.provide(appConfigLayer), - Layer.provide(BunContext.layer) - ) - - // HTTP server layer - const serverLayer = HttpServer.serve(makeRouterV2).pipe( - Layer.provide(BunHttpServer.layer({ port })), - Layer.provide(serviceLayer) - ) - - return yield* Layer.launch(serverLayer) -}) - -const loggingLayer = createLoggingLayer({ - stdoutLogLevel: LogLevel.Info, - fileLogLevel: LogLevel.Debug, - baseDir: ".mini-agent" -}) - -const mainLayer = Layer.mergeAll(loggingLayer, BunContext.layer) - -program.pipe( - Effect.provide(mainLayer), - BunRuntime.runMain -) diff --git a/src/server.service.ts b/src/server.service.ts deleted file mode 100644 index 18e22ff..0000000 --- a/src/server.service.ts +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Generic HTTP Server Service - * - * Provides the same abstraction level as the CLI for handling agent requests. - * Accepts JSONL events (like script mode) and streams back ContextEvents. - */ -import type { AiError, LanguageModel } from "@effect/ai" -import type { Error as PlatformError, FileSystem } from "@effect/platform" -import { Context, Effect, Layer, Schema, Stream } from "effect" -import type { ContextEvent, InputEvent } from "./context.model.ts" -import { SystemPromptEvent, UserMessageEvent } from "./context.model.ts" -import { ContextService } from "./context.service.ts" -import type { ContextLoadError, ContextSaveError } from "./errors.ts" -import type { CurrentLlmConfig } from "./llm-config.ts" - -/** Script mode input events - schema for HTTP parsing */ -export const ScriptInputEvent = Schema.Union(UserMessageEvent, SystemPromptEvent) -export type ScriptInputEvent = typeof ScriptInputEvent.Type - -export class AgentServer extends Context.Tag("@app/AgentServer")< - AgentServer, - { - /** - * Handle a request with input events, streaming back ContextEvents. - * Same semantics as CLI script mode. - * - * Note: The returned stream requires LanguageModel, FileSystem, and CurrentLlmConfig - * to be provided before running. - */ - readonly handleRequest: ( - contextName: string, - events: ReadonlyArray - ) => Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > - } ->() { - static readonly layer = Layer.effect( - AgentServer, - Effect.gen(function*() { - const contextService = yield* ContextService - - const handleRequest = ( - contextName: string, - events: ReadonlyArray - ) => contextService.addEvents(contextName, events) - - return AgentServer.of({ handleRequest }) - }) - ) - - static readonly testLayer = Layer.sync(AgentServer, () => - AgentServer.of({ - handleRequest: (_contextName, _events) => Stream.empty - })) -} diff --git a/test/cli.e2e.test.ts b/test/cli.e2e.test.ts index cdd168f..0684aa4 100644 --- a/test/cli.e2e.test.ts +++ b/test/cli.e2e.test.ts @@ -133,7 +133,7 @@ describe("CLI", () => { const jsonOutput = extractJsonOutput(result.stdout) // Should contain JSON with _tag field expect(jsonOutput).toContain("\"_tag\"") - expect(jsonOutput).toContain("\"AssistantMessage\"") + expect(jsonOutput).toContain("\"AssistantMessageEvent\"") }) test("includes ephemeral events with --show-ephemeral", { timeout: 15000 }, async ({ llmEnv, testDir }) => { @@ -147,7 +147,7 @@ describe("CLI", () => { expect(result.exitCode).toBe(0) const jsonOutput = extractJsonOutput(result.stdout) // Should contain TextDelta events when showing ephemeral - expect(jsonOutput).toContain("\"TextDelta\"") + expect(jsonOutput).toContain("\"TextDeltaEvent\"") }) }) @@ -215,7 +215,7 @@ describe("CLI", () => { // Should echo the input event and have AssistantMessage response expect(output).toContain("\"UserMessage\"") - expect(output).toContain("\"AssistantMessage\"") + expect(output).toContain("\"AssistantMessageEvent\"") }) test("handles multiple UserMessage events in sequence", { timeout: 15000 }, async ({ llmEnv, testDir }) => { @@ -239,7 +239,7 @@ describe("CLI", () => { const jsonLines = extractJsonLines(output) // Should have at least two AssistantMessage events (one per input) - const assistantMessages = jsonLines.filter((line) => line.includes("\"AssistantMessage\"")) + const assistantMessages = jsonLines.filter((line) => line.includes("\"AssistantMessageEvent\"")) expect(assistantMessages.length).toBeGreaterThanOrEqual(2) // Second response should mention the secret code @@ -270,7 +270,7 @@ describe("CLI", () => { // Should echo both events expect(output).toContain("\"SystemPrompt\"") expect(output).toContain("\"UserMessage\"") - expect(output).toContain("\"AssistantMessage\"") + expect(output).toContain("\"AssistantMessageEvent\"") }) test("includes TextDelta streaming events by default", { timeout: 15000 }, async ({ llmEnv, testDir }) => { @@ -290,9 +290,9 @@ describe("CLI", () => { ) // Script mode should include TextDelta events (streaming chunks) by default - expect(output).toContain("\"TextDelta\"") + expect(output).toContain("\"TextDeltaEvent\"") expect(output).toContain("\"delta\"") - expect(output).toContain("\"AssistantMessage\"") + expect(output).toContain("\"AssistantMessageEvent\"") }) }) @@ -324,7 +324,7 @@ describe("CLI", () => { expect(result.exitCode).toBe(0) const jsonOutput = extractJsonOutput(result.stdout) // Response should be JSON with AssistantMessage containing "blue" - expect(jsonOutput).toContain("\"AssistantMessage\"") + expect(jsonOutput).toContain("\"AssistantMessageEvent\"") expect(jsonOutput.toLowerCase()).toContain("blue") }) }) @@ -400,13 +400,28 @@ describe("Interrupted response context", () => { const contextsDir = path.join(testDir, ".mini-agent", "contexts") fs.mkdirSync(contextsDir, { recursive: true }) + // New architecture uses full event format with all required fields + // parentEventId is omitted when None (Schema.optionalWith encodes None as undefined) + // partialResponse is just a string when present const contextContent = `events: - - _tag: SystemPrompt + - _tag: SystemPromptEvent + id: "${contextName}:0000" + timestamp: "2024-01-01T00:00:00.000Z" + agentName: "${contextName}" + triggersAgentTurn: false content: You are a helpful assistant. - - _tag: UserMessage + - _tag: UserMessageEvent + id: "${contextName}:0001" + timestamp: "2024-01-01T00:00:01.000Z" + agentName: "${contextName}" + triggersAgentTurn: true content: Tell me a random 8-digit number followed by a long story. - - _tag: LLMRequestInterrupted - requestId: test-request-123 + - _tag: AgentTurnInterruptedEvent + id: "${contextName}:0002" + timestamp: "2024-01-01T00:00:02.000Z" + agentName: "${contextName}" + triggersAgentTurn: false + turnNumber: 1 reason: user_cancel partialResponse: "${testNumber}! Once upon a time in a faraway land, there lived a wise old wizard who..." ` @@ -414,7 +429,7 @@ describe("Interrupted response context", () => { // Now make a follow-up request asking about the number. // The LLM should know the number because: - // 1. The LLMRequestInterruptedEvent's partialResponse is included as an assistant message + // 1. The AgentTurnInterruptedEvent's partialResponse is included as an assistant message // 2. A user message explains the interruption happened const result = await Effect.runPromise( runCli( diff --git a/src/new-architecture/event-reducer.test.ts b/test/event-reducer.test.ts similarity index 78% rename from src/new-architecture/event-reducer.test.ts rename to test/event-reducer.test.ts index ab8efd2..09bd41e 100644 --- a/src/new-architecture/event-reducer.test.ts +++ b/test/event-reducer.test.ts @@ -3,11 +3,13 @@ * * Tests the pure reducer that folds events into ReducedContext. */ +import { Prompt } from "@effect/ai" import { describe, expect, it } from "@effect/vitest" import { DateTime, Effect, Option, Redacted } from "effect" -import type { AgentName, AgentTurnNumber, ContextName, LlmProviderId } from "./domain.ts" +import type { AgentName, AgentTurnNumber, ContextName, LlmProviderId } from "../src/domain.ts" import { AgentTurnCompletedEvent, + AgentTurnInterruptedEvent, AgentTurnStartedEvent, AssistantMessageEvent, makeEventId, @@ -16,8 +18,8 @@ import { SetTimeoutEvent, SystemPromptEvent, UserMessageEvent -} from "./domain.ts" -import { EventReducer } from "./event-reducer.ts" +} from "../src/domain.ts" +import { EventReducer } from "../src/event-reducer.ts" const testAgentName = "test-agent" as AgentName const testContextName = "test-context" as ContextName @@ -229,20 +231,60 @@ describe("EventReducer", () => { }).pipe(Effect.provide(EventReducer.Default))) }) - describe("idempotency", () => { - it.effect("reducing same events twice yields same result", () => + describe("AgentTurnInterruptedEvent", () => { + it.effect("adds partial response and interruption explainer", () => Effect.gen(function*() { const reducer = yield* EventReducer - const events = [ - new SystemPromptEvent({ ...baseFields(0), content: "System" }), - new UserMessageEvent({ ...baseFields(1), content: "Hello" }) - ] + const userEvent = new UserMessageEvent({ + ...baseFields(0), + content: "Tell me a story." + }) - const result1 = yield* reducer.reduce(reducer.initialReducedContext, events) - const result2 = yield* reducer.reduce(reducer.initialReducedContext, events) + const afterUser = yield* reducer.reduce(reducer.initialReducedContext, [userEvent]) - expect(result1.messages.length).toBe(result2.messages.length) - expect(result1.nextEventNumber).toBe(result2.nextEventNumber) + const interruptedEvent = new AgentTurnInterruptedEvent({ + ...baseFields(1), + turnNumber: 1 as AgentTurnNumber, + reason: "user_cancel", + partialResponse: Option.some("Once upon a time...") + }) + + const result = yield* reducer.reduce(afterUser, [interruptedEvent]) + + expect(result.messages).toHaveLength(3) + const assistantMessage = result.messages[result.messages.length - 2] + expect(assistantMessage?.role).toBe("assistant") + expect(assistantMessage?.content).toEqual([Prompt.textPart({ text: "Once upon a time..." })]) + + const explanationMessage = result.messages[result.messages.length - 1] + expect(explanationMessage?.role).toBe("user") + expect(explanationMessage?.content).toEqual([ + Prompt.textPart({ + text: "The previous assistant response was interrupted (user cancelled the response). Please continue from where it stopped." + }) + ]) + }).pipe(Effect.provide(EventReducer.Default))) + + it.effect("adds interruption explainer even without partial response", () => + Effect.gen(function*() { + const reducer = yield* EventReducer + const interruptedEvent = new AgentTurnInterruptedEvent({ + ...baseFields(0), + turnNumber: 1 as AgentTurnNumber, + reason: "timeout", + partialResponse: Option.none() + }) + + const result = yield* reducer.reduce(reducer.initialReducedContext, [interruptedEvent]) + + expect(result.messages).toHaveLength(1) + const explanationMessage = result.messages[0] + expect(explanationMessage?.role).toBe("user") + expect(explanationMessage?.content).toEqual([ + Prompt.textPart({ + text: "The previous assistant response was interrupted (the response timed out). Please continue from where it stopped." + }) + ]) }).pipe(Effect.provide(EventReducer.Default))) }) }) diff --git a/test/event-store.test.ts b/test/event-store.test.ts new file mode 100644 index 0000000..4d910a2 --- /dev/null +++ b/test/event-store.test.ts @@ -0,0 +1,156 @@ +/** + * EventStore Tests + * + * Tests the event store interface using the InMemory implementation. + * FileSystem implementation tested via e2e tests. + */ +import { describe, expect, it } from "@effect/vitest" +import { Effect } from "effect" +import type { AgentName, AgentTurnNumber, ContextName } from "../src/domain.ts" +import { EventBuilder } from "../src/domain.ts" +import { EventStore } from "../src/event-store.ts" + +const testAgentName = "test-agent" as AgentName +const testContextName = "test-context" as ContextName + +describe("EventStore.InMemory", () => { + describe("load", () => { + it.effect("returns empty array for non-existent context", () => + Effect.gen(function*() { + const store = yield* EventStore + const events = yield* store.load(testContextName) + expect(events).toEqual([]) + }).pipe(Effect.provide(EventStore.InMemory))) + + it.effect("returns events after append", () => + Effect.gen(function*() { + const store = yield* EventStore + const event = EventBuilder.userMessage(testAgentName, testContextName, 0, "Hello") + + yield* store.append(testContextName, [event]) + const events = yield* store.load(testContextName) + + expect(events.length).toBe(1) + expect(events[0]?._tag).toBe("UserMessageEvent") + }).pipe(Effect.provide(EventStore.InMemory))) + }) + + describe("append", () => { + it.effect("appends multiple events", () => + Effect.gen(function*() { + const store = yield* EventStore + + const events = [ + EventBuilder.userMessage(testAgentName, testContextName, 0, "First"), + EventBuilder.userMessage(testAgentName, testContextName, 1, "Second") + ] + + yield* store.append(testContextName, events) + const loaded = yield* store.load(testContextName) + + expect(loaded.length).toBe(2) + }).pipe(Effect.provide(EventStore.InMemory))) + + it.effect("appends incrementally", () => + Effect.gen(function*() { + const store = yield* EventStore + + yield* store.append(testContextName, [ + EventBuilder.userMessage(testAgentName, testContextName, 0, "First") + ]) + yield* store.append(testContextName, [ + EventBuilder.userMessage(testAgentName, testContextName, 1, "Second") + ]) + + const loaded = yield* store.load(testContextName) + expect(loaded.length).toBe(2) + }).pipe(Effect.provide(EventStore.InMemory))) + + it.effect("preserves event content", () => + Effect.gen(function*() { + const store = yield* EventStore + const content = "Test message with special chars: 日本語 🎉" + const event = EventBuilder.userMessage(testAgentName, testContextName, 0, content) + + yield* store.append(testContextName, [event]) + const [loaded] = yield* store.load(testContextName) + + expect(loaded?._tag).toBe("UserMessageEvent") + if (loaded?._tag === "UserMessageEvent") { + expect(loaded.content).toBe(content) + } + }).pipe(Effect.provide(EventStore.InMemory))) + }) + + describe("exists", () => { + it.effect("returns false for non-existent context", () => + Effect.gen(function*() { + const store = yield* EventStore + const exists = yield* store.exists(testContextName) + expect(exists).toBe(false) + }).pipe(Effect.provide(EventStore.InMemory))) + + it.effect("returns true after append", () => + Effect.gen(function*() { + const store = yield* EventStore + const event = EventBuilder.userMessage(testAgentName, testContextName, 0, "Hello") + + yield* store.append(testContextName, [event]) + const exists = yield* store.exists(testContextName) + + expect(exists).toBe(true) + }).pipe(Effect.provide(EventStore.InMemory))) + }) + + describe("isolation", () => { + it.effect("different contexts are isolated", () => + Effect.gen(function*() { + const store = yield* EventStore + const context1 = "context-1" as ContextName + const context2 = "context-2" as ContextName + + yield* store.append(context1, [ + EventBuilder.userMessage(testAgentName, context1, 0, "Context 1") + ]) + yield* store.append(context2, [ + EventBuilder.userMessage(testAgentName, context2, 0, "Context 2 - 1"), + EventBuilder.userMessage(testAgentName, context2, 1, "Context 2 - 2") + ]) + + const events1 = yield* store.load(context1) + const events2 = yield* store.load(context2) + + expect(events1.length).toBe(1) + expect(events2.length).toBe(2) + }).pipe(Effect.provide(EventStore.InMemory))) + }) + + describe("event types", () => { + it.effect("stores all event types", () => + Effect.gen(function*() { + const store = yield* EventStore + + const events = [ + EventBuilder.systemPrompt(testAgentName, testContextName, 0, "System"), + EventBuilder.userMessage(testAgentName, testContextName, 1, "User"), + EventBuilder.assistantMessage(testAgentName, testContextName, 2, "Assistant"), + EventBuilder.textDelta(testAgentName, testContextName, 3, "Delta"), + EventBuilder.agentTurnStarted(testAgentName, testContextName, 4, 1 as AgentTurnNumber), + EventBuilder.agentTurnCompleted(testAgentName, testContextName, 5, 1 as AgentTurnNumber, 100) + ] + + yield* store.append(testContextName, events) + const loaded = yield* store.load(testContextName) + + expect(loaded.length).toBe(6) + expect(loaded.map((e) => e._tag)).toEqual([ + "SystemPromptEvent", + "UserMessageEvent", + "AssistantMessageEvent", + "TextDeltaEvent", + "AgentTurnStartedEvent", + "AgentTurnCompletedEvent" + ]) + }).pipe(Effect.provide(EventStore.InMemory))) + }) +}) diff --git a/test/llm.test.ts b/test/llm.test.ts deleted file mode 100644 index 2d060c5..0000000 --- a/test/llm.test.ts +++ /dev/null @@ -1,169 +0,0 @@ -/** - * LLM Module Tests - */ -import { BunContext } from "@effect/platform-bun" -import { describe, expect, it } from "@effect/vitest" -import { Effect } from "effect" -import { - AssistantMessageEvent, - LLMRequestInterruptedEvent, - SystemPromptEvent, - UserMessageEvent -} from "../src/context.model.ts" -import { eventsToPrompt } from "../src/llm.ts" - -const testLayer = BunContext.layer - -describe("eventsToPrompt", () => { - it.effect("converts basic conversation to prompt", () => - Effect.gen(function*() { - const events = [ - new SystemPromptEvent({ content: "You are helpful" }), - new UserMessageEvent({ content: "Hello" }), - new AssistantMessageEvent({ content: "Hi there!" }), - new UserMessageEvent({ content: "How are you?" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - expect(messages).toHaveLength(4) - expect(messages[0]?.role).toBe("system") - expect(messages[1]?.role).toBe("user") - expect(messages[2]?.role).toBe("assistant") - expect(messages[3]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) - - it.effect("includes interrupted response as assistant message", () => - Effect.gen(function*() { - const events = [ - new SystemPromptEvent({ content: "You are helpful" }), - new UserMessageEvent({ content: "Tell me a story" }), - new LLMRequestInterruptedEvent({ - requestId: "test-123", - reason: "user_cancel", - partialResponse: "Once upon a time, there was a" - }), - new UserMessageEvent({ content: "Continue" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // Should have: system, user, assistant (partial), user (interruption notice), user (new message) - expect(messages).toHaveLength(5) - - expect(messages[0]?.role).toBe("system") - expect(messages[1]?.role).toBe("user") - - // Interrupted response becomes assistant message - const assistantMsg = messages[2] - expect(assistantMsg?.role).toBe("assistant") - if (assistantMsg?.role === "assistant") { - const assistantContent = assistantMsg.content - expect(Array.isArray(assistantContent)).toBe(true) - const firstPart = assistantContent[0] - expect(firstPart?.type).toBe("text") - if (firstPart?.type === "text") { - expect(firstPart.text).toBe("Once upon a time, there was a") - } - } - - // Interruption notice is injected as user message - const noticeMsg = messages[3] - expect(noticeMsg?.role).toBe("user") - if (noticeMsg?.role === "user") { - const noticeContent = noticeMsg.content - expect(Array.isArray(noticeContent)).toBe(true) - const firstPart = noticeContent[0] - expect(firstPart?.type).toBe("text") - if (firstPart?.type === "text") { - expect(firstPart.text).toContain("Your previous response was interrupted") - expect(firstPart.text).toContain("Once upon a time, there was a") - } - } - - // New user message - expect(messages[4]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) - - it.effect("handles user_new_message interruption reason", () => - Effect.gen(function*() { - const events = [ - new UserMessageEvent({ content: "Start" }), - new LLMRequestInterruptedEvent({ - requestId: "test-456", - reason: "user_new_message", - partialResponse: "I was saying" - }), - new UserMessageEvent({ content: "New question" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // user, assistant (partial), user (notice), user (new) - expect(messages).toHaveLength(4) - expect(messages[0]?.role).toBe("user") - expect(messages[1]?.role).toBe("assistant") - expect(messages[2]?.role).toBe("user") - expect(messages[3]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) - - it.effect("handles multiple interruptions in sequence", () => - Effect.gen(function*() { - const events = [ - new UserMessageEvent({ content: "First question" }), - new LLMRequestInterruptedEvent({ - requestId: "test-1", - reason: "user_cancel", - partialResponse: "First partial" - }), - new UserMessageEvent({ content: "Second question" }), - new LLMRequestInterruptedEvent({ - requestId: "test-2", - reason: "user_cancel", - partialResponse: "Second partial" - }), - new UserMessageEvent({ content: "Third question" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // user, assistant, user (notice), user, assistant, user (notice), user - expect(messages).toHaveLength(7) - - let userCount = 0 - let assistantCount = 0 - for (const msg of messages) { - if (msg.role === "user") userCount++ - if (msg.role === "assistant") assistantCount++ - } - - expect(userCount).toBe(5) // 3 real + 2 interruption notices - expect(assistantCount).toBe(2) // 2 partial responses - }).pipe(Effect.provide(testLayer))) - - it.effect("skips empty partial response in interruption", () => - Effect.gen(function*() { - // Edge case: interrupted before any content was generated - const events = [ - new UserMessageEvent({ content: "Question" }), - new LLMRequestInterruptedEvent({ - requestId: "test-empty", - reason: "user_cancel", - partialResponse: "" - }), - new UserMessageEvent({ content: "New question" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // Empty interruption is skipped - only the two user messages remain - expect(messages).toHaveLength(2) - expect(messages[0]?.role).toBe("user") - expect(messages[1]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) -}) diff --git a/src/new-architecture/mini-agent.test.ts b/test/mini-agent.test.ts similarity index 85% rename from src/new-architecture/mini-agent.test.ts rename to test/mini-agent.test.ts index 7c64656..d0f3d50 100644 --- a/src/new-architecture/mini-agent.test.ts +++ b/test/mini-agent.test.ts @@ -11,12 +11,20 @@ */ import { describe, expect, it } from "@effect/vitest" import { Effect, Layer, Option, Ref, Stream } from "effect" -import type { AgentName, AgentTurnNumber, ContextEvent, ContextName, LlmProviderId, ReducedContext } from "./domain.ts" -import { AgentError, EventBuilder, MiniAgentTurn } from "./domain.ts" -import { EventReducer } from "./event-reducer.ts" -import { EventStore } from "./event-store.ts" -import type { ActorState } from "./mini-agent.ts" -import { makeExecuteTurn, makeMiniAgent } from "./mini-agent.ts" +import { AgentRegistry } from "../src/agent-registry.ts" +import type { + AgentName, + AgentTurnNumber, + ContextEvent, + ContextName, + LlmProviderId, + ReducedContext +} from "../src/domain.ts" +import { AgentError, EventBuilder, MiniAgentTurn } from "../src/domain.ts" +import { EventReducer } from "../src/event-reducer.ts" +import { EventStore } from "../src/event-store.ts" +import type { ActorState } from "../src/mini-agent.ts" +import { makeExecuteTurn, makeMiniAgent } from "../src/mini-agent.ts" const testAgentName = "test-agent" as AgentName const testContextName = "test-context" as ContextName @@ -272,3 +280,30 @@ describe("MiniAgent", () => { )) }) }) + +describe("AgentRegistry", () => { + it.effect("creates new agent on getOrCreate", () => + Effect.gen(function*() { + const registry = yield* AgentRegistry + const agent = yield* registry.getOrCreate(testAgentName) + expect(agent.agentName).toBe(testAgentName) + }).pipe(Effect.provide(AgentRegistry.TestLayer))) + + it.effect("returns same agent on second getOrCreate", () => + Effect.gen(function*() { + const registry = yield* AgentRegistry + const agent1 = yield* registry.getOrCreate(testAgentName) + const agent2 = yield* registry.getOrCreate(testAgentName) + expect(agent1.contextName).toBe(agent2.contextName) + }).pipe(Effect.provide(AgentRegistry.TestLayer))) + + it.effect("list returns all agent names", () => + Effect.gen(function*() { + const registry = yield* AgentRegistry + yield* registry.getOrCreate("agent-a" as AgentName) + yield* registry.getOrCreate("agent-b" as AgentName) + const names = yield* registry.list + expect(names).toContain("agent-a") + expect(names).toContain("agent-b") + }).pipe(Effect.provide(AgentRegistry.TestLayer))) +}) diff --git a/test/server.e2e.test.ts b/test/server.e2e.test.ts index c98540e..ac86f76 100644 --- a/test/server.e2e.test.ts +++ b/test/server.e2e.test.ts @@ -140,13 +140,13 @@ describe("HTTP Server", () => { } }) - test("context endpoint processes messages", { timeout: 60000 }, async ({ llmEnv, testDir }) => { + test("agent endpoint processes messages", { timeout: 60000 }, async ({ llmEnv, testDir }) => { const { cleanup, port } = await startServer(testDir, llmEnv) try { - const response = await fetchWithRetry(`http://localhost:${port}/context/test-context`, { + const response = await fetchWithRetry(`http://localhost:${port}/agent/test-context`, { method: "POST", - headers: { "Content-Type": "application/x-ndjson" }, + headers: { "Content-Type": "application/json" }, body: "{\"_tag\":\"UserMessage\",\"content\":\"Say exactly: HELLO_SERVER\"}" }) @@ -156,21 +156,21 @@ describe("HTTP Server", () => { const events = await parseSSE(response) expect(events.length).toBeGreaterThan(0) - // Should have AssistantMessage event - const hasAssistant = events.some((e) => e.includes("\"AssistantMessage\"")) + // Should have AssistantMessageEvent + const hasAssistant = events.some((e) => e.includes("\"AssistantMessageEvent\"")) expect(hasAssistant).toBe(true) } finally { await cleanup() } }) - test("context endpoint returns 400 for empty body", { timeout: 30000 }, async ({ llmEnv, testDir }) => { + test("agent endpoint returns 400 for empty body", { timeout: 30000 }, async ({ llmEnv, testDir }) => { const { cleanup, port } = await startServer(testDir, llmEnv) try { - const response = await fetchWithRetry(`http://localhost:${port}/context/test-context`, { + const response = await fetchWithRetry(`http://localhost:${port}/agent/test-context`, { method: "POST", - headers: { "Content-Type": "application/x-ndjson" }, + headers: { "Content-Type": "application/json" }, body: "" }) diff --git a/test/services.unit.test.ts b/test/services.unit.test.ts deleted file mode 100644 index 5738a33..0000000 --- a/test/services.unit.test.ts +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Service Unit Tests - * - * Tests services using testLayer pattern for isolated unit testing. - * See: https://www.effect.solutions/testing - * - * Pattern: Each test provides a fresh layer so state never leaks between tests. - */ -import { describe, expect, it } from "@effect/vitest" -import { Effect } from "effect" -import { SystemPromptEvent, UserMessageEvent } from "../src/context.model.js" -import { ContextRepository } from "../src/context.repository.js" - -// ============================================================================= -// ContextRepository Tests -// ============================================================================= - -describe("ContextRepository", () => { - describe("load", () => { - it.effect("returns empty array for non-existent context", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const events = yield* repo.load("non-existent") - expect(events).toEqual([]) - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("returns saved events after save", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const events = [ - new SystemPromptEvent({ content: "Test system prompt" }), - new UserMessageEvent({ content: "Hello" }) - ] - - yield* repo.save("test-context", events) - const loaded = yield* repo.load("test-context") - - expect(loaded).toHaveLength(2) - expect(loaded[0]?._tag).toBe("SystemPrompt") - expect(loaded[1]?._tag).toBe("UserMessage") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("loadOrCreate", () => { - it.effect("creates context with system prompt if not exists", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const events = yield* repo.loadOrCreate("new-context") - - expect(events).toHaveLength(1) - expect(events[0]?._tag).toBe("SystemPrompt") - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("returns existing events if context exists", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const initial = [new SystemPromptEvent({ content: "Custom prompt" })] - yield* repo.save("existing", initial) - - const events = yield* repo.loadOrCreate("existing") - - expect(events).toHaveLength(1) - expect(events[0]?._tag).toBe("SystemPrompt") - expect(events[0]?._tag === "SystemPrompt" && events[0].content).toBe("Custom prompt") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("list", () => { - it.effect("returns empty array when no contexts", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const contexts = yield* repo.list() - expect(contexts).toEqual([]) - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("returns sorted context names", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - yield* repo.save("zebra", [new SystemPromptEvent({ content: "z" })]) - yield* repo.save("alpha", [new SystemPromptEvent({ content: "a" })]) - - const contexts = yield* repo.list() - - expect(contexts).toEqual(["alpha", "zebra"]) - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("save", () => { - it.effect("overwrites existing context", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const initial = [new SystemPromptEvent({ content: "First" })] - const updated = [ - new SystemPromptEvent({ content: "Second" }), - new UserMessageEvent({ content: "Hello" }) - ] - - yield* repo.save("overwrite-test", initial) - yield* repo.save("overwrite-test", updated) - - const loaded = yield* repo.load("overwrite-test") - expect(loaded).toHaveLength(2) - expect(loaded[0]?._tag === "SystemPrompt" && loaded[0].content).toBe("Second") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("getContextsDir", () => { - it.effect("returns test directory path", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const dir = repo.getContextsDir() - expect(dir).toBe("/test/contexts") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) -}) - -// ============================================================================= -// State Isolation Tests -// ============================================================================= - -describe("Test Isolation", () => { - it.effect("first test saves data", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - yield* repo.save("isolation-test", [ - new SystemPromptEvent({ content: "First test data" }) - ]) - - const loaded = yield* repo.load("isolation-test") - expect(loaded).toHaveLength(1) - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("second test has fresh state (no leakage)", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - // Should not see data from first test - const loaded = yield* repo.load("isolation-test") - expect(loaded).toEqual([]) - }).pipe(Effect.provide(ContextRepository.testLayer))) -})