diff --git a/README.md b/README.md index 44b6105..b595e47 100644 --- a/README.md +++ b/README.md @@ -78,20 +78,23 @@ LLM='{"apiFormat":"openai-chat-completions","model":"my-model","baseUrl":"https: ## Event Types -See [`src/context.model.ts`](src/context.model.ts) for schema definitions. +See [`src/domain.ts`](src/domain.ts) for schema definitions. **Input Events** (via stdin in script mode): -- `UserMessage` - User message content -- `SystemPrompt` - System behavior configuration -- `FileAttachment` - Image or file attachment +- `UserMessage` - User message content (output as `UserMessageEvent`) +- `SystemPrompt` - System behavior configuration (output as `SystemPromptEvent`) **Output Events**: -- `TextDelta` - Streaming chunk (ephemeral) -- `AssistantMessage` - Complete response (persisted) - -**Internal Events** (persisted): -- `SetLlmConfig` - LLM configuration for context -- `LLMRequestInterrupted` - Partial response on cancellation +- `TextDeltaEvent` - Streaming chunk (ephemeral) +- `AssistantMessageEvent` - Complete response (persisted) +- `AgentTurnStartedEvent` - LLM turn started +- `AgentTurnCompletedEvent` - LLM turn completed +- `AgentTurnInterruptedEvent` - Turn interrupted (partial response) + +**Lifecycle Events** (persisted): +- `SessionStartedEvent` - Agent session started +- `SessionEndedEvent` - Agent session ended +- `SetLlmConfigEvent` - LLM configuration for context ## Script Mode diff --git a/src/new-architecture/agent-registry.test.ts b/src/agent-registry.test.ts similarity index 100% rename from src/new-architecture/agent-registry.test.ts rename to src/agent-registry.test.ts diff --git a/src/new-architecture/agent-registry.ts b/src/agent-registry.ts similarity index 94% rename from src/new-architecture/agent-registry.ts rename to src/agent-registry.ts index 5da6a27..192ddf1 100644 --- a/src/new-architecture/agent-registry.ts +++ b/src/agent-registry.ts @@ -150,6 +150,13 @@ export class AgentRegistry extends Effect.Service()("@mini-agent/ Effect.map((map) => Array.from(map.keys())) ) + // List all contexts from EventStore (includes persisted contexts not yet loaded) + const listContexts: Effect.Effect> = Effect.gen(function*() { + const contextNames = yield* store.list() + // Convert context names back to agent names (remove "-v1" suffix) + return contextNames.map((cn) => cn.replace(/-v1$/, "") as AgentName) + }) + const shutdownAgent = (agentName: AgentName): Effect.Effect => Effect.gen(function*() { const current = yield* Ref.get(agents) @@ -190,6 +197,7 @@ export class AgentRegistry extends Effect.Service()("@mini-agent/ getOrCreate, get, list, + listContexts, shutdownAgent, shutdownAll } diff --git a/src/cli/chat-ui.ts b/src/cli/chat-ui.ts index 4a6b290..4b595ea 100644 --- a/src/cli/chat-ui.ts +++ b/src/cli/chat-ui.ts @@ -4,22 +4,23 @@ * Interactive chat with interruptible LLM streaming. * Return during streaming interrupts (with optional new message); Escape exits. */ -import type { AiError, LanguageModel } from "@effect/ai" -import type { Error as PlatformError, FileSystem } from "@effect/platform" -import { Cause, Context, Effect, Fiber, Layer, Mailbox, Stream } from "effect" +import type { Error as PlatformError } from "@effect/platform" +import { Cause, Context, Effect, Fiber, Layer, Mailbox, Option, Stream } from "effect" import { is } from "effect/Schema" +import { AgentRegistry } from "../agent-registry.ts" import { + type AgentName, + type AgentTurnNumber, AssistantMessageEvent, type ContextEvent, - LLMRequestInterruptedEvent, - TextDeltaEvent, - UserMessageEvent -} from "../context.model.ts" -import { ContextService } from "../context.service.ts" -import type { ContextLoadError, ContextSaveError } from "../errors.ts" -import type { CurrentLlmConfig } from "../llm-config.ts" -import { streamLLMResponse } from "../llm.ts" -import { type ChatController, runOpenTUIChat } from "./components/opentui-chat.tsx" + type ContextName, + type ContextSaveError, + DEFAULT_SYSTEM_PROMPT, + EventBuilder, + type ReducerError, + TextDeltaEvent +} from "../domain.ts" +import { type ChatController, type DisplayEvent, runOpenTUIChat } from "./components/opentui-chat.tsx" type ChatSignal = | { readonly _tag: "Input"; readonly text: string } @@ -32,23 +33,59 @@ export class ChatUI extends Context.Tag("@app/ChatUI")< contextName: string ) => Effect.Effect< void, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig + PlatformError.PlatformError | ReducerError | ContextSaveError, + AgentRegistry > } >() { static readonly layer = Layer.effect( ChatUI, Effect.gen(function*() { - const contextService = yield* ContextService + const registry = yield* AgentRegistry const runChat = Effect.fn("ChatUI.runChat")(function*(contextName: string) { - const existingEvents = yield* contextService.load(contextName) + const agentName = contextName as AgentName + const agent = yield* registry.getOrCreate(agentName) + + // Check if context needs initialization + const ctx = yield* agent.getReducedContext + if (ctx.messages.length === 0) { + const systemEvent = EventBuilder.systemPrompt( + agentName, + agent.contextName, + ctx.nextEventNumber, + DEFAULT_SYSTEM_PROMPT + ) + yield* agent.addEvent(systemEvent) + } + + // Get events for display + const existingEvents = yield* agent.getEvents + + // Convert domain events to display events for the UI + const displayEvents: Array = existingEvents + .map((e): DisplayEvent | null => { + switch (e._tag) { + case "UserMessageEvent": + return { _tag: "UserMessage", content: e.content } + case "AssistantMessageEvent": + return { _tag: "AssistantMessage", content: e.content } + case "SystemPromptEvent": + return { _tag: "SystemPrompt", content: e.content } + case "AgentTurnInterruptedEvent": + return Option.isSome(e.partialResponse) + ? { _tag: "LLMRequestInterrupted", partialResponse: e.partialResponse.value, reason: e.reason } + : null + default: + return null + } + }) + .filter((e): e is DisplayEvent => e !== null) const mailbox = yield* Mailbox.make() const chat = yield* Effect.promise(() => - runOpenTUIChat(contextName, existingEvents, { + runOpenTUIChat(contextName, displayEvents, { onSubmit: (text) => { mailbox.unsafeOffer({ _tag: "Input", text }) }, @@ -58,7 +95,7 @@ export class ChatUI extends Context.Tag("@app/ChatUI")< }) ) - yield* runChatLoop(contextName, contextService, chat, mailbox).pipe( + yield* runChatLoop(agentName, agent, chat, mailbox).pipe( Effect.catchAll((error) => Effect.logError("Chat error", { error }).pipe(Effect.as(undefined))), Effect.catchAllCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.void : Effect.logError("Chat loop error", cause) @@ -74,19 +111,22 @@ export class ChatUI extends Context.Tag("@app/ChatUI")< static readonly testLayer = Layer.sync(ChatUI, () => ChatUI.of({ runChat: () => Effect.void })) } +interface AgentInterface { + readonly addEvent: (event: ContextEvent) => Effect.Effect + readonly events: Stream.Stream + readonly getReducedContext: Effect.Effect<{ nextEventNumber: number; currentTurnNumber: AgentTurnNumber }, never> + readonly contextName: ContextName +} + const runChatLoop = ( - contextName: string, - contextService: Context.Tag.Service, + agentName: AgentName, + agent: AgentInterface, chat: ChatController, mailbox: Mailbox.Mailbox -): Effect.Effect< - void, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig -> => +): Effect.Effect => Effect.fn("ChatUI.runChatLoop")(function*() { while (true) { - const result = yield* runChatTurn(contextName, contextService, chat, mailbox, null) + const result = yield* runChatTurn(agentName, agent, chat, mailbox, null) if (result._tag === "exit") { return } @@ -98,16 +138,12 @@ type TurnResult = | { readonly _tag: "exit" } const runChatTurn = ( - contextName: string, - contextService: Context.Tag.Service, + agentName: AgentName, + agent: AgentInterface, chat: ChatController, mailbox: Mailbox.Mailbox, pendingMessage: string | null -): Effect.Effect< - TurnResult, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig -> => +): Effect.Effect => Effect.fn("ChatUI.runChatTurn")(function*() { // Get message either from pending or by waiting for input let userMessage: string @@ -128,31 +164,35 @@ const runChatTurn = ( return { _tag: "continue" } as const } - const userEvent = new UserMessageEvent({ content: userMessage }) + const ctx = yield* agent.getReducedContext - yield* contextService.persistEvent(contextName, userEvent) - chat.addEvent(userEvent) + // Create and add user event + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + ctx.nextEventNumber, + userMessage + ) + yield* agent.addEvent(userEvent) + + // Show in UI + chat.addEvent({ _tag: "UserMessage", content: userMessage }) - const events = yield* contextService.load(contextName) let accumulatedText = "" const streamFiber = yield* Effect.fork( - streamLLMResponse(events).pipe( + agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), Stream.tap((event: ContextEvent) => Effect.sync(() => { if (is(TextDeltaEvent)(event)) { accumulatedText += event.delta - chat.addEvent(event) + chat.addEvent({ _tag: "TextDelta", delta: event.delta }) + } else if (is(AssistantMessageEvent)(event)) { + chat.addEvent({ _tag: "AssistantMessage", content: event.content }) } }) ), - Stream.filter(is(AssistantMessageEvent)), - Stream.tap((event) => - Effect.gen(function*() { - yield* contextService.persistEvent(contextName, event) - chat.addEvent(event) - }) - ), Stream.runDrain ) ) @@ -165,30 +205,42 @@ const runChatTurn = ( if (result._tag === "exit") { if (accumulatedText.length > 0) { - const interruptedEvent = new LLMRequestInterruptedEvent({ - requestId: crypto.randomUUID(), - reason: "user_cancel", - partialResponse: accumulatedText - }) - yield* contextService.persistEvent(contextName, interruptedEvent) - chat.addEvent(interruptedEvent) + const interruptedCtx = yield* agent.getReducedContext + const interruptedEvent = EventBuilder.agentTurnInterrupted( + agentName, + agent.contextName, + interruptedCtx.nextEventNumber, + interruptedCtx.currentTurnNumber, + "user_cancel", + accumulatedText + ) + yield* agent.addEvent(interruptedEvent).pipe(Effect.catchAll(() => Effect.void)) + chat.addEvent({ _tag: "LLMRequestInterrupted", partialResponse: accumulatedText, reason: "user_cancel" }) } return { _tag: "exit" } as const } // result._tag === "interrupted" - user hit return during streaming if (accumulatedText.length > 0) { - const interruptedEvent = new LLMRequestInterruptedEvent({ - requestId: crypto.randomUUID(), - reason: result.newMessage ? "user_new_message" : "user_cancel", - partialResponse: accumulatedText + const interruptedCtx = yield* agent.getReducedContext + const interruptedEvent = EventBuilder.agentTurnInterrupted( + agentName, + agent.contextName, + interruptedCtx.nextEventNumber, + interruptedCtx.currentTurnNumber, + result.newMessage ? "user_new_message" : "user_cancel", + accumulatedText + ) + yield* agent.addEvent(interruptedEvent).pipe(Effect.catchAll(() => Effect.void)) + chat.addEvent({ + _tag: "LLMRequestInterrupted", + partialResponse: accumulatedText, + reason: result.newMessage ? "user_new_message" : "user_cancel" }) - yield* contextService.persistEvent(contextName, interruptedEvent) - chat.addEvent(interruptedEvent) } if (result.newMessage) { - return yield* runChatTurn(contextName, contextService, chat, mailbox, result.newMessage) + return yield* runChatTurn(agentName, agent, chat, mailbox, result.newMessage) } return { _tag: "continue" } as const @@ -200,9 +252,9 @@ type StreamResult = | { readonly _tag: "interrupted"; readonly newMessage: string | null } const awaitStreamCompletion = ( - fiber: Fiber.RuntimeFiber, + fiber: Fiber.RuntimeFiber, mailbox: Mailbox.Mailbox -): Effect.Effect => +): Effect.Effect => Effect.fn("ChatUI.awaitStreamCompletion")(function*() { const waitForFiber = Fiber.join(fiber).pipe(Effect.as({ _tag: "completed" } as StreamResult)) const waitForInterrupt = Effect.gen(function*() { diff --git a/src/cli/commands.ts b/src/cli/commands.ts index 92f939e..4cbd6d0 100644 --- a/src/cli/commands.ts +++ b/src/cli/commands.ts @@ -5,22 +5,23 @@ */ import { type Prompt, Telemetry } from "@effect/ai" import { Command, Options, Prompt as CliPrompt } from "@effect/cli" -import { type Error as PlatformError, FileSystem, HttpServer, Terminal } from "@effect/platform" +import { FileSystem, HttpServer, Terminal } from "@effect/platform" import { BunHttpServer, BunStream } from "@effect/platform-bun" -import { Chunk, Console, Effect, Layer, Option, Schema, Stream } from "effect" +import { Chunk, Console, Effect, Fiber, Layer, Option, Schema, Stream } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { AppConfig, resolveBaseDir } from "../config.ts" import { + type AgentName, AssistantMessageEvent, - type ContextEvent, - FileAttachmentEvent, - type InputEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "../context.model.ts" -import { ContextService } from "../context.service.ts" + DEFAULT_SYSTEM_PROMPT, + EventBuilder, + TextDeltaEvent +} from "../domain.ts" +import { EventReducer } from "../event-reducer.ts" +import { EventStoreFileSystem } from "../event-store-fs.ts" import { makeRouter } from "../http.ts" import { layercodeCommand } from "../layercode/index.ts" +import { LlmTurnLive } from "../llm-turn.ts" import { AgentServer } from "../server.service.ts" import { printTraceLinks } from "../tracing.ts" @@ -91,98 +92,70 @@ const imageOption = Options.text("image").pipe( Options.optional ) -const MIME_TYPES: Record = { - ".png": "image/png", - ".jpg": "image/jpeg", - ".jpeg": "image/jpeg", - ".gif": "image/gif", - ".webp": "image/webp" -} - -const getMediaType = (filePath: string): string => { - const ext = filePath.toLowerCase().slice(filePath.lastIndexOf(".")) - return MIME_TYPES[ext] ?? "application/octet-stream" -} - -const getFileName = (filePath: string): string => { - const lastSlash = Math.max(filePath.lastIndexOf("/"), filePath.lastIndexOf("\\")) - return lastSlash >= 0 ? filePath.slice(lastSlash + 1) : filePath -} - -const isUrl = (input: string): boolean => input.startsWith("http://") || input.startsWith("https://") - interface OutputOptions { raw: boolean showEphemeral: boolean } -/** - * Handle a single context event based on output options. - */ -const handleEvent = ( - event: ContextEvent, - options: OutputOptions -): Effect.Effect => - Effect.gen(function*() { - const terminal = yield* Terminal.Terminal - - if (options.raw) { - if (Schema.is(TextDeltaEvent)(event) && !options.showEphemeral) { - return - } - yield* Console.log(JSON.stringify(event)) - return - } - - if (Schema.is(TextDeltaEvent)(event)) { - yield* terminal.display(event.delta) - return - } - if (Schema.is(AssistantMessageEvent)(event)) { - yield* Console.log("") - return - } - }) - -/** Run the event stream, handling each event */ +/** Run the event stream using the new architecture */ const runEventStream = ( contextName: string, userMessage: string, - options: OutputOptions, - imageInput?: string + options: OutputOptions ) => Effect.gen(function*() { - const contextService = yield* ContextService - const inputEvents: Array = [] - - if (imageInput) { - const mediaType = getMediaType(imageInput) - const fileName = getFileName(imageInput) - - if (isUrl(imageInput)) { - inputEvents.push( - new FileAttachmentEvent({ - source: { type: "url", url: imageInput }, - mediaType, - fileName - }) - ) - } else { - inputEvents.push( - new FileAttachmentEvent({ - source: { type: "file", path: imageInput }, - mediaType, - fileName - }) - ) - } + const registry = yield* AgentRegistry + const terminal = yield* Terminal.Terminal + + const agentName = contextName as AgentName + const agent = yield* registry.getOrCreate(agentName) + const ctx = yield* agent.getReducedContext + + // If this is a new context, add default system prompt + if (ctx.messages.length === 0) { + const systemEvent = EventBuilder.systemPrompt( + agentName, + agent.contextName, + ctx.nextEventNumber, + DEFAULT_SYSTEM_PROMPT + ) + yield* agent.addEvent(systemEvent) } - inputEvents.push(new UserMessageEvent({ content: userMessage })) + // Get updated context after potential system prompt + const updatedCtx = yield* agent.getReducedContext + + // Subscribe to events BEFORE adding the user message + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((event) => { + if (options.raw) { + if (Schema.is(TextDeltaEvent)(event) && !options.showEphemeral) { + return Effect.void + } + return Console.log(JSON.stringify(event)) + } else if (Schema.is(TextDeltaEvent)(event)) { + return terminal.display(event.delta) + } else if (Schema.is(AssistantMessageEvent)(event)) { + return Console.log("") + } + return Effect.void + }), + Stream.runDrain, + Effect.fork + ) - yield* contextService.addEvents(contextName, inputEvents).pipe( - Stream.runForEach((event) => handleEvent(event, options)) + // Add user message with triggersAgentTurn=true + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + updatedCtx.nextEventNumber, + userMessage ) + yield* agent.addEvent(userEvent) + + // Wait for stream to complete + yield* Fiber.join(streamFiber) }) /** CLI interaction mode - determines how input/output is handled */ @@ -210,7 +183,10 @@ const readAllStdin: Effect.Effect = BunStream.stdin.pipe( Effect.map((chunks) => Chunk.join(chunks, "").trim()) ) -const ScriptInputEvent = Schema.Union(UserMessageEvent, SystemPromptEvent) +const ScriptInputEvent = Schema.Union( + Schema.Struct({ _tag: Schema.Literal("UserMessage"), content: Schema.String }), + Schema.Struct({ _tag: Schema.Literal("SystemPrompt"), content: Schema.String }) +) const stdinEvents = BunStream.stdin.pipe( Stream.mapChunks(Chunk.map((bytes) => utf8Decoder.decode(bytes))), @@ -225,19 +201,59 @@ const stdinEvents = BunStream.stdin.pipe( const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => Effect.gen(function*() { - const contextService = yield* ContextService + const registry = yield* AgentRegistry + const terminal = yield* Terminal.Terminal + + const agentName = contextName as AgentName + const agent = yield* registry.getOrCreate(agentName) yield* stdinEvents.pipe( Stream.mapEffect((event) => Effect.gen(function*() { yield* Console.log(JSON.stringify(event)) - if (Schema.is(UserMessageEvent)(event)) { - yield* contextService.addEvents(contextName, [event]).pipe( - Stream.runForEach((outputEvent) => handleEvent(outputEvent, options)) + if (event._tag === "UserMessage") { + const ctx = yield* agent.getReducedContext + + // Subscribe to events BEFORE adding the user message + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((outputEvent) => { + if (options.raw) { + if (Schema.is(TextDeltaEvent)(outputEvent) && !options.showEphemeral) { + return Effect.void + } + return Console.log(JSON.stringify(outputEvent)) + } else if (Schema.is(TextDeltaEvent)(outputEvent)) { + return terminal.display(outputEvent.delta) + } else if (Schema.is(AssistantMessageEvent)(outputEvent)) { + return Console.log("") + } + return Effect.void + }), + Stream.runDrain, + Effect.fork + ) + + // Add user message + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + ctx.nextEventNumber, + event.content + ) + yield* agent.addEvent(userEvent) + + yield* Fiber.join(streamFiber) + } else if (event._tag === "SystemPrompt") { + const ctx = yield* agent.getReducedContext + const systemEvent = EventBuilder.systemPrompt( + agentName, + agent.contextName, + ctx.nextEventNumber, + event.content ) - } else if (Schema.is(SystemPromptEvent)(event)) { - yield* Effect.logDebug("SystemPrompt events in script mode are echoed but not persisted") + yield* agent.addEvent(systemEvent) } }) ), @@ -248,8 +264,8 @@ const scriptInteractiveLoop = (contextName: string, options: OutputOptions) => const NEW_CONTEXT_VALUE = "__new__" const selectOrCreateContext = Effect.gen(function*() { - const contextService = yield* ContextService - const contexts = yield* contextService.list() + const registry = yield* AgentRegistry + const contexts = yield* registry.listContexts if (contexts.length === 0) { yield* Console.log("No existing contexts found.") @@ -295,6 +311,13 @@ const makeChatUILayer = () => }) ) +const makeAgentLayer = () => + AgentRegistry.Default.pipe( + Layer.provide(LlmTurnLive), + Layer.provide(EventStoreFileSystem), + Layer.provide(EventReducer.Default) + ) + const runChat = (options: { name: Option.Option message: Option.Option @@ -307,7 +330,6 @@ const runChat = (options: { yield* Effect.logDebug("Starting chat session") const mode = determineMode(options) const contextName = Option.getOrElse(options.name, generateRandomContextName) - const imagePath = Option.getOrNull(options.image) ?? undefined const outputOptions: OutputOptions = { raw: mode === "script" || options.raw, @@ -317,7 +339,7 @@ const runChat = (options: { switch (mode) { case "single-turn": { const message = Option.getOrElse(options.message, () => "") - yield* runEventStream(contextName, message, outputOptions, imagePath) + yield* runEventStream(contextName, message, outputOptions) if (!outputOptions.raw) { yield* printTraceLinks } @@ -327,7 +349,7 @@ const runChat = (options: { case "pipe": { const input = yield* readAllStdin if (input !== "") { - yield* runEventStream(contextName, input, { raw: false, showEphemeral: false }, imagePath) + yield* runEventStream(contextName, input, { raw: false, showEphemeral: false }) } break } @@ -353,7 +375,7 @@ const runChat = (options: { } } }).pipe( - Effect.provide(makeChatUILayer()), + Effect.provide(Layer.merge(makeChatUILayer(), makeAgentLayer())), Effect.withSpan("chat-session") ) @@ -517,10 +539,13 @@ export const serveCommand = Command.make( // Create server layer with configured port/host const serverLayer = BunHttpServer.layer({ port: actualPort, hostname: actualHost }) - // Create layers for the server + // AgentServer layer - uses AgentRegistry from context + const agentServerLayer = AgentServer.layer + + // Merge layers for the server const layers = Layer.mergeAll( serverLayer, - AgentServer.layer + agentServerLayer ) // Use Layer.launch to keep the server running diff --git a/src/cli/components/opentui-chat.tsx b/src/cli/components/opentui-chat.tsx index a037cca..31fd12c 100644 --- a/src/cli/components/opentui-chat.tsx +++ b/src/cli/components/opentui-chat.tsx @@ -2,7 +2,7 @@ * OpenTUI Chat Component * * Architecture: - * - ContextEvent[] dispatched via controller.addEvent() + * - DisplayEvent[] dispatched via controller.addEvent() * - feedReducer folds each event into FeedItem[] (accumulated state) * - Feed component renders feedItems (pure render, knows nothing about events) * @@ -15,8 +15,23 @@ import { Option, Schema } from "effect" import { createCliRenderer, TextAttributes } from "@opentui/core" import { createRoot } from "@opentui/react/renderer" import { memo, useCallback, useMemo, useReducer, useRef, useState } from "react" -import type { ContextEvent, PersistedEvent } from "../../context.model.ts" -import { AttachmentSource } from "../../context.model.ts" + +/** Simple display events for the UI - decoupled from domain events */ +export type DisplayEvent = + | { _tag: "UserMessage"; content: string } + | { _tag: "AssistantMessage"; content: string } + | { _tag: "SystemPrompt"; content: string } + | { _tag: "TextDelta"; delta: string } + | { _tag: "LLMRequestInterrupted"; partialResponse: string; reason: string } + | { _tag: "FileAttachment"; source: AttachmentSource; fileName?: string } + | { _tag: "SetLlmConfig" } + +/** Attachment source - local file path or remote URL */ +const AttachmentSource = Schema.Union( + Schema.Struct({ type: Schema.Literal("file"), path: Schema.String }), + Schema.Struct({ type: Schema.Literal("url"), url: Schema.String }) +) +type AttachmentSource = typeof AttachmentSource.Type /** User's message in the conversation */ class UserMessageItem extends Schema.TaggedClass()("UserMessageItem", { @@ -71,7 +86,7 @@ const FeedItem = Schema.Union( ) type FeedItem = typeof FeedItem.Type -type FeedAction = { event: ContextEvent; isHistory: boolean } +type FeedAction = { event: DisplayEvent; isHistory: boolean } /** * Folds a context event into accumulated feed items. @@ -310,13 +325,13 @@ export interface ChatCallbacks { } export interface ChatController { - addEvent: (event: ContextEvent) => void + addEvent: (event: DisplayEvent) => void cleanup: () => void } interface ChatAppProps { contextName: string - initialEvents: PersistedEvent[] + initialEvents: DisplayEvent[] callbacks: ChatCallbacks controllerRef: React.MutableRefObject } @@ -353,7 +368,7 @@ function ChatApp({ contextName, initialEvents, callbacks, controllerRef }: ChatA // Set up controller synchronously during first render if (!controllerRef.current) { controllerRef.current = { - addEvent(event: ContextEvent) { + addEvent(event: DisplayEvent) { dispatchRef.current({ event, isHistory: false }) }, cleanup() { @@ -421,7 +436,7 @@ function ChatApp({ contextName, initialEvents, callbacks, controllerRef }: ChatA export async function runOpenTUIChat( contextName: string, - initialEvents: PersistedEvent[], + initialEvents: DisplayEvent[], callbacks: ChatCallbacks ): Promise { let exitSignaled = false @@ -462,7 +477,7 @@ export async function runOpenTUIChat( renderer.start() return { - addEvent(event: ContextEvent) { + addEvent(event: DisplayEvent) { controllerRef.current?.addEvent(event) }, cleanup() { diff --git a/src/cli/main.ts b/src/cli/main.ts index 06a43a6..132d2cd 100644 --- a/src/cli/main.ts +++ b/src/cli/main.ts @@ -7,6 +7,7 @@ import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai" import { FetchHttpClient } from "@effect/platform" import { BunContext, BunRuntime } from "@effect/platform-bun" import { Cause, Effect, Layer } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { AppConfig, extractConfigPath, @@ -15,9 +16,10 @@ import { type MiniAgentConfig as MiniAgentConfigType, resolveBaseDir } from "../config.ts" -import { ContextRepository } from "../context.repository.ts" -import { ContextService } from "../context.service.ts" +import { EventReducer } from "../event-reducer.ts" +import { EventStoreFileSystem } from "../event-store-fs.ts" import { CurrentLlmConfig, getApiKey, type LlmConfig, resolveLlmConfig } from "../llm-config.ts" +import { LlmTurnLive } from "../llm-turn.ts" import { createLoggingLayer } from "../logging.ts" import { OpenAiChatClient, OpenAiChatLanguageModel } from "../openai-chat-completions-client.ts" import { createTracingLayer } from "../tracing.ts" @@ -97,8 +99,14 @@ const makeMainLayer = (args: ReadonlyArray) => const languageModelLayer = makeLanguageModelLayer(llmConfig) const tracingLayer = createTracingLayer("mini-agent") - return ContextService.layer.pipe( - Layer.provideMerge(ContextRepository.layer), + // New architecture layers + const agentLayer = AgentRegistry.Default.pipe( + Layer.provide(LlmTurnLive), + Layer.provide(EventStoreFileSystem), + Layer.provide(EventReducer.Default) + ) + + return agentLayer.pipe( Layer.provideMerge(languageModelLayer), Layer.provideMerge(llmConfigLayer), Layer.provideMerge(tracingLayer), diff --git a/src/context.model.ts b/src/context.model.ts deleted file mode 100644 index 532faff..0000000 --- a/src/context.model.ts +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Context Event Schemas - * - * A Context is the central concept in this codebase: a named, ordered list of events. - * Events represent conversation turns between user and assistant, plus system configuration. - * - * Event Types: - * - SystemPrompt: Initial AI behavior configuration (persisted) - * - UserMessage: Input from the user (persisted) - * - AssistantMessage: Complete response from the AI (persisted) - * - TextDelta: Streaming chunk (ephemeral, never persisted) - * - SetLlmConfig: LLM configuration for this context (persisted) - */ -import { Schema } from "effect" -import { LlmConfig } from "./llm-config.ts" - -/** Branded type for context names - prevents mixing with other strings */ -export const ContextName = Schema.String.pipe(Schema.brand("ContextName")) -export type ContextName = typeof ContextName.Type - -/** Message format for LLM APIs and tracing */ -export interface LLMMessage { - readonly role: "system" | "user" | "assistant" - readonly content: string -} - -/** System prompt event - sets the AI's behavior */ -export class SystemPromptEvent extends Schema.TaggedClass()("SystemPrompt", { - content: Schema.String -}) { - toLLMMessage(): LLMMessage { - return { role: "system", content: this.content } - } -} - -/** User message event - input from the user */ -export class UserMessageEvent extends Schema.TaggedClass()("UserMessage", { - content: Schema.String -}) { - toLLMMessage(): LLMMessage { - return { role: "user", content: this.content } - } -} - -/** Assistant message event - complete response from the AI */ -export class AssistantMessageEvent extends Schema.TaggedClass()("AssistantMessage", { - content: Schema.String -}) { - toLLMMessage(): LLMMessage { - return { role: "assistant", content: this.content } - } -} - -/** Text delta event - streaming chunk (ephemeral, never persisted) */ -export class TextDeltaEvent extends Schema.TaggedClass()("TextDelta", { - delta: Schema.String -}) {} - -/** Reason for LLM request interruption */ -export const InterruptReason = Schema.Literal("user_cancel", "user_new_message", "timeout") -export type InterruptReason = typeof InterruptReason.Type - -/** Emitted when LLM request is interrupted - persisted because it contains partial response */ -export class LLMRequestInterruptedEvent - extends Schema.TaggedClass()("LLMRequestInterrupted", { - requestId: Schema.String, - reason: InterruptReason, - partialResponse: Schema.String - }) -{ - toLLMMessage(): LLMMessage { - return { role: "assistant", content: this.partialResponse } - } -} - -/** Attachment source - local file path or remote URL */ -export const AttachmentSource = Schema.Union( - Schema.Struct({ type: Schema.Literal("file"), path: Schema.String }), - Schema.Struct({ type: Schema.Literal("url"), url: Schema.String }) -) -export type AttachmentSource = typeof AttachmentSource.Type - -/** File attachment event - image or other file shared with AI */ -export class FileAttachmentEvent extends Schema.TaggedClass()( - "FileAttachment", - { - source: AttachmentSource, - mediaType: Schema.String, - fileName: Schema.optional(Schema.String) - } -) {} - -/** Sets the LLM config for this context. Added when context is created. */ -export class SetLlmConfigEvent extends Schema.TaggedClass()( - "SetLlmConfig", - { config: LlmConfig } -) {} - -/** Events that get persisted to the context file */ -export const PersistedEvent = Schema.Union( - SystemPromptEvent, - UserMessageEvent, - AssistantMessageEvent, - LLMRequestInterruptedEvent, - FileAttachmentEvent, - SetLlmConfigEvent -) -export type PersistedEvent = typeof PersistedEvent.Type - -/** All possible context events (persisted + ephemeral) */ -export const ContextEvent = Schema.Union( - SystemPromptEvent, - UserMessageEvent, - AssistantMessageEvent, - LLMRequestInterruptedEvent, - FileAttachmentEvent, - SetLlmConfigEvent, - TextDeltaEvent -) -export type ContextEvent = typeof ContextEvent.Type - -/** Input events that can be added via addEvents */ -export const InputEvent = Schema.Union(UserMessageEvent, FileAttachmentEvent, SystemPromptEvent) -export type InputEvent = typeof InputEvent.Type - -export const DEFAULT_SYSTEM_PROMPT = `You are a helpful, friendly assistant. -Keep your responses concise but informative. -Use markdown formatting when helpful.` diff --git a/src/context.repository.ts b/src/context.repository.ts deleted file mode 100644 index d4fbe4c..0000000 --- a/src/context.repository.ts +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Context Repository - * - * Handles file I/O for context persistence. Contexts are stored as YAML files - * in the configured data storage directory. - */ -import { FileSystem, Path } from "@effect/platform" -import { Context, Effect, Layer, Option, Schema } from "effect" -import * as YAML from "yaml" -import { AppConfig } from "./config.ts" -import { - ContextName, - DEFAULT_SYSTEM_PROMPT, - PersistedEvent, - type PersistedEvent as PersistedEventType, - SystemPromptEvent -} from "./context.model.ts" -import { ContextLoadError, ContextSaveError } from "./errors.ts" - -/** - * Decode a plain object to a PersistedEvent class instance. - * This ensures the event has all the methods defined on the class. - */ -const decodeEvent = Schema.decodeUnknownSync(PersistedEvent) - -/** - * Decode an array of plain objects to PersistedEvent class instances. - */ -const decodeEvents = (rawEvents: Array): Array => rawEvents.map((raw) => decodeEvent(raw)) - -/** - * Encode an event to a plain object for YAML serialization. - */ -const encodeEvent = Schema.encodeSync(PersistedEvent) - -export class ContextRepository extends Context.Tag("@app/ContextRepository")< - ContextRepository, - { - readonly load: (contextName: string) => Effect.Effect, ContextLoadError> - readonly loadOrCreate: ( - contextName: string - ) => Effect.Effect, ContextLoadError | ContextSaveError> - readonly save: ( - contextName: string, - events: ReadonlyArray - ) => Effect.Effect - /** Append events to existing context (load + save atomically) */ - readonly append: ( - contextName: string, - events: ReadonlyArray - ) => Effect.Effect - readonly list: () => Effect.Effect, ContextLoadError> - readonly getContextsDir: () => string - } ->() { - /** - * Production layer with file system persistence. - */ - static readonly layer = Layer.effect( - ContextRepository, - Effect.gen(function*() { - const fs = yield* FileSystem.FileSystem - const path = yield* Path.Path - const config = yield* AppConfig - - // Resolve the contexts directory from config - const cwd = Option.getOrElse(config.cwd, () => process.cwd()) - const contextsDir = path.join(cwd, config.dataStorageDir, "contexts") - - const getContextPath = (contextName: string) => path.join(contextsDir, `${contextName}.yaml`) - - // Service methods wrapped with Effect.fn for call-site tracing - // See: https://www.effect.solutions/services-and-layers - - /** - * Save events to a context file. - */ - const save = Effect.fn("ContextRepository.save")( - function*(contextName: string, events: ReadonlyArray) { - const filePath = getContextPath(contextName) - - // Ensure directory exists - yield* fs.makeDirectory(contextsDir, { recursive: true }).pipe( - Effect.catchAll(() => Effect.void) - ) - - // Convert to plain objects for YAML serialization using Schema encoding - const plainEvents = events.map((e) => encodeEvent(e)) - - const yaml = YAML.stringify({ events: plainEvents }) - yield* fs.writeFileString(filePath, yaml).pipe( - Effect.catchAll((error) => - new ContextSaveError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - } - ) - - /** - * Load events from a context file. - * Returns empty array if context doesn't exist. - */ - const load = Effect.fn("ContextRepository.load")( - function*(contextName: string) { - const filePath = getContextPath(contextName) - const exists = yield* fs.exists(filePath).pipe( - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - - if (!exists) { - return [] as Array - } - - return yield* fs.readFileString(filePath).pipe( - Effect.map((yaml) => { - const parsed = YAML.parse(yaml) as { events?: Array } - return decodeEvents(parsed?.events ?? []) - }), - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - } - ) - - /** - * Load events from a context, creating it with default system prompt if it doesn't exist. - */ - const loadOrCreate = Effect.fn("ContextRepository.loadOrCreate")( - function*(contextName: string) { - const filePath = getContextPath(contextName) - const exists = yield* fs.exists(filePath).pipe( - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - - if (!exists) { - const initialEvents = [new SystemPromptEvent({ content: DEFAULT_SYSTEM_PROMPT })] - yield* save(contextName, initialEvents) - return initialEvents - } - - return yield* fs.readFileString(filePath).pipe( - Effect.map((yaml) => { - const parsed = YAML.parse(yaml) as { events?: Array } - return decodeEvents(parsed?.events ?? []) - }), - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(contextName), - cause: error - }) - ) - ) - } - ) - - /** - * Append events to an existing context. - * Loads current events, appends new ones, and saves atomically. - */ - const append = Effect.fn("ContextRepository.append")( - function*(contextName: string, newEvents: ReadonlyArray) { - const existing = yield* load(contextName) - const combined = [...existing, ...newEvents] - yield* save(contextName, combined) - } - ) - - /** - * List all existing context names, sorted by most recently modified first. - */ - const list = Effect.fn("ContextRepository.list")( - function*() { - const exists = yield* fs.exists(contextsDir).pipe( - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(""), - cause: error - }) - ) - ) - if (!exists) return [] as Array - - const entries = yield* fs.readDirectory(contextsDir).pipe( - Effect.map((names) => names.filter((name) => name.endsWith(".yaml"))), - Effect.catchAll((error) => - new ContextLoadError({ - name: ContextName.make(""), - cause: error - }) - ) - ) - - // Get modification times for each file - const entriesWithTimes = yield* Effect.all( - entries.map((name) => - fs.stat(path.join(contextsDir, name)).pipe( - Effect.map((stat) => ({ - name: name.replace(/\.yaml$/, ""), - mtime: Option.getOrElse(stat.mtime, () => new Date(0)) - })), - Effect.catchAll(() => Effect.succeed({ name: name.replace(/\.yaml$/, ""), mtime: new Date(0) })) - ) - ) - ) - - // Sort by modification time, most recent first - return entriesWithTimes - .sort((a, b) => b.mtime.getTime() - a.mtime.getTime()) - .map((entry) => entry.name) - } - ) - - return ContextRepository.of({ - load, - loadOrCreate, - save, - append, - list, - getContextsDir: () => contextsDir - }) - }) - ) - - /** - * Test layer with in-memory storage for unit tests. - * See: https://www.effect.solutions/testing - */ - static readonly testLayer = Layer.sync(ContextRepository, () => { - const store = new Map>() - - return ContextRepository.of({ - load: (contextName: string) => Effect.succeed(store.get(contextName) ?? []), - loadOrCreate: (contextName: string) => - Effect.sync(() => { - const existing = store.get(contextName) - if (existing) return existing - const initial = [new SystemPromptEvent({ content: DEFAULT_SYSTEM_PROMPT })] - store.set(contextName, initial) - return initial - }), - save: (contextName: string, events: ReadonlyArray) => - Effect.sync(() => void store.set(contextName, [...events])), - append: (contextName: string, newEvents: ReadonlyArray) => - Effect.sync(() => { - const existing = store.get(contextName) ?? [] - store.set(contextName, [...existing, ...newEvents]) - }), - list: () => Effect.sync(() => Array.from(store.keys()).sort()), - getContextsDir: () => "/test/contexts" - }) - }) -} diff --git a/src/context.service.ts b/src/context.service.ts deleted file mode 100644 index 6669d59..0000000 --- a/src/context.service.ts +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Context Service - * - * The main domain service for working with Contexts. - * - * A Context is a named, ordered list of events representing a conversation. - * The only supported operation is `addEvents`: - * 1. Appends input events (typically UserMessage) to the context - * 2. Triggers an LLM request with the full event history - * 3. Streams back new events (TextDelta ephemeral, AssistantMessage persisted) - * 4. Persists the new events to the context file - */ -import type { AiError, LanguageModel } from "@effect/ai" -import type { Error as PlatformError, FileSystem } from "@effect/platform" -import { Context, Effect, Layer, pipe, Schema, Stream } from "effect" -import { - AssistantMessageEvent, - type ContextEvent, - DEFAULT_SYSTEM_PROMPT, - type InputEvent, - PersistedEvent, - type PersistedEvent as PersistedEventType, - SetLlmConfigEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "./context.model.ts" -import { ContextRepository } from "./context.repository.ts" -import type { ContextLoadError, ContextSaveError } from "./errors.ts" -import { CurrentLlmConfig, LlmConfig } from "./llm-config.ts" -import { streamLLMResponse } from "./llm.ts" - -// ============================================================================= -// Context Service -// ============================================================================= - -export class ContextService extends Context.Tag("@app/ContextService")< - ContextService, - { - /** - * Add events to a context, triggering LLM processing if UserMessage present. - * - * This is the core operation on a Context: - * 1. Loads existing events (or creates context with system prompt) - * 2. Appends the input events (UserMessage and/or FileAttachment) - * 3. Runs LLM with full history (only if UserMessage present) - * 4. Streams back TextDelta (ephemeral) and AssistantMessage (persisted) - * 5. Persists new events as they complete - */ - readonly addEvents: ( - contextName: string, - inputEvents: ReadonlyArray - ) => Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > - - /** Load all events from a context. */ - readonly load: (contextName: string) => Effect.Effect, ContextLoadError> - - /** List all context names. */ - readonly list: () => Effect.Effect, ContextLoadError> - - /** Persist an event directly (e.g., LLMRequestInterruptedEvent on cancel). */ - readonly persistEvent: ( - contextName: string, - event: PersistedEventType - ) => Effect.Effect - } ->() { - /** - * Production layer with file system persistence and LLM integration. - */ - static readonly layer = Layer.effect( - ContextService, - Effect.gen(function*() { - const repo = yield* ContextRepository - - // Service methods wrapped with Effect.fn for call-site tracing - // See: https://www.effect.solutions/services-and-layers - - const addEvents = ( - contextName: string, - inputEvents: ReadonlyArray - ): Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > => { - // Check if any UserMessage is present (triggers LLM) - const hasUserMessage = inputEvents.some(Schema.is(UserMessageEvent)) - - return pipe( - // Load or create context, append input events - Effect.fn("ContextService.addEvents.prepare")(function*() { - const llmConfig = yield* CurrentLlmConfig - - // Check if context exists before loading/creating - const existingEvents = yield* repo.load(contextName) - const isNewContext = existingEvents.length === 0 - - // If new context, create with system prompt and LLM config - const baseEvents = isNewContext - ? [ - new SystemPromptEvent({ content: DEFAULT_SYSTEM_PROMPT }), - new SetLlmConfigEvent({ config: llmConfig }) - ] - : existingEvents - - const newPersistedInputs = inputEvents.filter(Schema.is(PersistedEvent)) as Array - - if (isNewContext || newPersistedInputs.length > 0) { - const allEvents = [...baseEvents, ...newPersistedInputs] - yield* repo.save(contextName, allEvents) - return allEvents - } - return baseEvents - })(), - // Only stream LLM response if there's a UserMessage - Effect.andThen((events) => hasUserMessage ? streamLLMResponse(events) : Stream.empty), - Stream.unwrap, - // Persist events as they complete (only persisted ones) - Stream.tap((event) => - Schema.is(PersistedEvent)(event) - ? Effect.gen(function*() { - const current = yield* repo.load(contextName) - yield* repo.save(contextName, [...current, event]) - }) - : Effect.void - ) - ) - } - - const load = Effect.fn("ContextService.load")( - function*(contextName: string) { - return yield* repo.load(contextName) - } - ) - - const list = Effect.fn("ContextService.list")( - function*() { - return yield* repo.list() - } - ) - - const persistEvent = Effect.fn("ContextService.persistEvent")( - function*(contextName: string, event: PersistedEventType) { - const current = yield* repo.load(contextName) - yield* repo.save(contextName, [...current, event]) - } - ) - - return ContextService.of({ - addEvents, - load, - list, - persistEvent - }) - }) - ) - - /** - * Test layer with mock LLM responses for unit tests. - * See: https://www.effect.solutions/testing - */ - static readonly testLayer = Layer.sync(ContextService, () => { - // In-memory store for test contexts - const store = new Map>() - - // Mock LLM config for tests - const mockLlmConfig = new LlmConfig({ - apiFormat: "openai-responses", - model: "test-model", - baseUrl: "https://api.test.com", - apiKeyEnvVar: "TEST_API_KEY" - }) - - return ContextService.of({ - addEvents: ( - contextName: string, - inputEvents: ReadonlyArray - ): Stream.Stream => { - // Load or create context - let events = store.get(contextName) - if (!events) { - events = [ - new SystemPromptEvent({ content: "Test system prompt" }), - new SetLlmConfigEvent({ config: mockLlmConfig }) - ] - store.set(contextName, events) - } - - // Add input events - const newPersistedInputs = inputEvents.filter(Schema.is(PersistedEvent)) as Array - if (newPersistedInputs.length > 0) { - events = [...events, ...newPersistedInputs] - store.set(contextName, events) - } - - // Check if any UserMessage is present - const hasUserMessage = inputEvents.some(Schema.is(UserMessageEvent)) - - // Only generate mock LLM response if there's a UserMessage - if (!hasUserMessage) { - return Stream.empty - } - - // Mock LLM response stream - const mockResponse = "This is a mock response for testing." - const assistantEvent = new AssistantMessageEvent({ content: mockResponse }) - - return Stream.make( - new TextDeltaEvent({ delta: mockResponse }), - assistantEvent - ).pipe( - Stream.tap((event) => - Schema.is(PersistedEvent)(event) - ? Effect.sync(() => { - const current = store.get(contextName) ?? [] - store.set(contextName, [...current, event]) - }) - : Effect.void - ) - ) - }, - - load: (contextName: string) => Effect.succeed(store.get(contextName) ?? []), - - list: () => Effect.sync(() => Array.from(store.keys()).sort()), - - persistEvent: (contextName: string, event: PersistedEventType) => - Effect.sync(() => { - const current = store.get(contextName) ?? [] - store.set(contextName, [...current, event]) - }) - }) - }) -} diff --git a/src/new-architecture/domain.ts b/src/domain.ts similarity index 93% rename from src/new-architecture/domain.ts rename to src/domain.ts index 8fd4e8b..b37df92 100644 --- a/src/new-architecture/domain.ts +++ b/src/domain.ts @@ -385,5 +385,28 @@ export const EventBuilder = { ...EventBuilder.makeBase(agentName, contextName, nextEventNumber, false, parentEventId), turnNumber, durationMs + }), + + agentTurnInterrupted: ( + agentName: AgentName, + contextName: ContextName, + nextEventNumber: number, + turnNumber: AgentTurnNumber, + reason: InterruptReason, + partialResponse?: string + ) => + new AgentTurnInterruptedEvent({ + ...EventBuilder.makeBase(agentName, contextName, nextEventNumber, false), + turnNumber, + reason, + partialResponse: partialResponse ? Option.some(partialResponse) : Option.none() }) } + +// ----------------------------------------------------------------------------- +// Default System Prompt +// ----------------------------------------------------------------------------- + +export const DEFAULT_SYSTEM_PROMPT = `You are a helpful, friendly assistant. +Keep your responses concise but informative. +Use markdown formatting when helpful.` diff --git a/src/errors.ts b/src/errors.ts deleted file mode 100644 index fe08d68..0000000 --- a/src/errors.ts +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Domain Error Types - * - * Uses Schema.TaggedError for serializable, type-safe error handling. - * See: https://www.effect.solutions/error-handling - */ -import { Schema } from "effect" -import { ContextName } from "./context.model.ts" - -// ============================================================================= -// Context Errors -// ============================================================================= - -/** Error when a context is not found */ -export class ContextNotFound extends Schema.TaggedError()( - "ContextNotFound", - { name: ContextName } -) {} - -/** Error when loading a context fails */ -export class ContextLoadError extends Schema.TaggedError()( - "ContextLoadError", - { - name: ContextName, - cause: Schema.Defect - } -) {} - -/** Error when saving a context fails */ -export class ContextSaveError extends Schema.TaggedError()( - "ContextSaveError", - { - name: ContextName, - cause: Schema.Defect - } -) {} - -/** Union of all context-related errors */ -export const ContextError = Schema.Union( - ContextNotFound, - ContextLoadError, - ContextSaveError -) -export type ContextError = typeof ContextError.Type - -// ============================================================================= -// Configuration Errors -// ============================================================================= - -/** Error when configuration is invalid or missing */ -export class ConfigurationError extends Schema.TaggedError()( - "ConfigurationError", - { - key: Schema.String, - message: Schema.String - } -) {} - -// ============================================================================= -// LLM Errors -// ============================================================================= - -/** Error when LLM request fails */ -export class LLMError extends Schema.TaggedError()( - "LLMError", - { - message: Schema.String, - cause: Schema.optional(Schema.Defect) - } -) {} diff --git a/src/new-architecture/event-reducer.test.ts b/src/event-reducer.test.ts similarity index 100% rename from src/new-architecture/event-reducer.test.ts rename to src/event-reducer.test.ts diff --git a/src/new-architecture/event-reducer.ts b/src/event-reducer.ts similarity index 100% rename from src/new-architecture/event-reducer.ts rename to src/event-reducer.ts diff --git a/src/new-architecture/event-store-fs.ts b/src/event-store-fs.ts similarity index 88% rename from src/new-architecture/event-store-fs.ts rename to src/event-store-fs.ts index 1f61100..1a560fa 100644 --- a/src/new-architecture/event-store-fs.ts +++ b/src/event-store-fs.ts @@ -9,7 +9,7 @@ import { FileSystem, Path } from "@effect/platform" import { Deferred, Effect, Layer, Option, Queue, Ref, Schema } from "effect" import * as YAML from "yaml" -import { AppConfig } from "../config.ts" +import { AppConfig } from "./config.ts" import { ContextEvent, ContextLoadError, type ContextName, ContextSaveError } from "./domain.ts" import { EventStore } from "./event-store.ts" @@ -159,6 +159,25 @@ export const EventStoreFileSystem: Layer.Layer< Effect.catchAll(() => Effect.succeed(false)) ) - return { load, append, exists } as unknown as EventStore + const list = () => + Effect.gen(function*() { + const dirExists = yield* fs.exists(contextsDir).pipe( + Effect.catchAll(() => Effect.succeed(false)) + ) + if (!dirExists) { + return [] as ReadonlyArray + } + + const files = yield* fs.readDirectory(contextsDir).pipe( + Effect.catchAll(() => Effect.succeed([] as ReadonlyArray)) + ) + + // Filter for .yaml files and extract context names + return files + .filter((f) => f.endsWith(".yaml")) + .map((f) => f.replace(/\.yaml$/, "") as ContextName) + }) + + return { load, append, exists, list } as unknown as EventStore }) ) diff --git a/src/new-architecture/event-store.test.ts b/src/event-store.test.ts similarity index 100% rename from src/new-architecture/event-store.test.ts rename to src/event-store.test.ts diff --git a/src/new-architecture/event-store.ts b/src/event-store.ts similarity index 86% rename from src/new-architecture/event-store.ts rename to src/event-store.ts index 2406b05..48fbfc5 100644 --- a/src/new-architecture/event-store.ts +++ b/src/event-store.ts @@ -3,7 +3,7 @@ * * Pluggable implementations: * - InMemory: For tests (fresh state per layer creation) - * - Future: Yaml files, Postgres, Redis + * - FileSystem: For production (YAML files) */ import { Effect, Layer } from "effect" @@ -18,7 +18,8 @@ export class EventStore extends Effect.Service()("@mini-agent/EventS Effect.succeed([]), append: (_contextName: ContextName, _events: ReadonlyArray): Effect.Effect => Effect.void, - exists: (_contextName: ContextName): Effect.Effect => Effect.succeed(false) + exists: (_contextName: ContextName): Effect.Effect => Effect.succeed(false), + list: (): Effect.Effect> => Effect.succeed([]) }, accessors: true }) { @@ -38,7 +39,9 @@ export class EventStore extends Effect.Service()("@mini-agent/EventS store.set(contextName, [...existing, ...events]) }), - exists: (contextName: ContextName) => Effect.sync(() => store.has(contextName)) + exists: (contextName: ContextName) => Effect.sync(() => store.has(contextName)), + + list: () => Effect.sync(() => Array.from(store.keys())) } as unknown as EventStore }) } diff --git a/src/http.ts b/src/http.ts index 0deb697..b031311 100644 --- a/src/http.ts +++ b/src/http.ts @@ -4,11 +4,9 @@ * Provides HTTP endpoints that mirror the CLI interface: * - POST /context/:contextName - Send events, receive SSE stream of responses */ -import { LanguageModel } from "@effect/ai" -import { FileSystem, HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" -import { Effect, Schema, Stream } from "effect" -import type { ContextEvent } from "./context.model.ts" -import { CurrentLlmConfig } from "./llm-config.ts" +import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" +import { Console, Effect, Schema, Stream } from "effect" +import { type ContextEvent } from "./domain.ts" import { AgentServer, ScriptInputEvent } from "./server.service.ts" /** Encode a ContextEvent as an SSE data line */ @@ -48,11 +46,6 @@ const contextHandler = Effect.gen(function*() { const agentServer = yield* AgentServer const params = yield* HttpRouter.params - // Get context services to provide to the stream - const langModel = yield* LanguageModel.LanguageModel - const fs = yield* FileSystem.FileSystem - const llmConfig = yield* CurrentLlmConfig - const contextName = params.contextName if (!contextName) { return HttpServerResponse.text("Missing contextName", { status: 400 }) @@ -77,17 +70,14 @@ const contextHandler = Effect.gen(function*() { return HttpServerResponse.text(message, { status: 400 }) } - const events = parseResult.right - if (events.length === 0) { + const inputEvents = parseResult.right + if (inputEvents.length === 0) { return HttpServerResponse.text("No valid events in body", { status: 400 }) } - // Stream SSE events directly - provide services to remove context requirements - const sseStream = agentServer.handleRequest(contextName, events).pipe( - Stream.map(encodeSSE), - Stream.provideService(LanguageModel.LanguageModel, langModel), - Stream.provideService(FileSystem.FileSystem, fs), - Stream.provideService(CurrentLlmConfig, llmConfig) + // Use AgentServer to process events and stream response + const sseStream = agentServer.handleRequest(contextName, inputEvents).pipe( + Stream.map(encodeSSE) ) return HttpServerResponse.stream(sseStream, { @@ -113,6 +103,6 @@ export const makeRouter = HttpRouter.empty.pipe( /** Run the server and log the address - for standalone use */ export const runServer = Effect.gen(function*() { - yield* Effect.log("Server started") + yield* Console.log("Server started") return yield* Effect.never }) diff --git a/src/layercode/cli.ts b/src/layercode/cli.ts index e434987..6f5c973 100644 --- a/src/layercode/cli.ts +++ b/src/layercode/cli.ts @@ -8,8 +8,12 @@ import type { CommandExecutor } from "@effect/platform" import { Command as PlatformCommand, HttpRouter, HttpServer } from "@effect/platform" import { BunHttpServer } from "@effect/platform-bun" import { Console, Effect, Layer, Option, Stream } from "effect" +import { AgentRegistry } from "../agent-registry.ts" import { AppConfig } from "../config.ts" +import { EventReducer } from "../event-reducer.ts" +import { EventStoreFileSystem } from "../event-store-fs.ts" import { makeRouter } from "../http.ts" +import { LlmTurnLive } from "../llm-turn.ts" import { AgentServer } from "../server.service.ts" import { makeLayerCodeRouter } from "./layercode.adapter.ts" @@ -147,9 +151,17 @@ const layercodeServeCommand = CliCommand.make( // Create server layer with configured port/host const serverLayer = BunHttpServer.layer({ port: actualPort, hostname: actualHost }) + // Create agent layer (AgentServer needs AgentRegistry) + const agentLayer = AgentServer.layer.pipe( + Layer.provide(AgentRegistry.Default), + Layer.provide(LlmTurnLive), + Layer.provide(EventStoreFileSystem), + Layer.provide(EventReducer.Default) + ) + const layers = Layer.mergeAll( serverLayer, - AgentServer.layer + agentLayer ) // Start the tunnel if enabled (fork it to run concurrently with server) diff --git a/src/layercode/layercode.adapter.ts b/src/layercode/layercode.adapter.ts index 71de0e6..a5750c8 100644 --- a/src/layercode/layercode.adapter.ts +++ b/src/layercode/layercode.adapter.ts @@ -14,12 +14,10 @@ * → * data: {"type":"response.tts","content":"Hi","turn_id":"123"} */ -import { LanguageModel } from "@effect/ai" -import { FileSystem, HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" +import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" import { Effect, Option, Schema, Stream } from "effect" import { AppConfig } from "../config.ts" -import { AssistantMessageEvent, type ContextEvent, TextDeltaEvent, UserMessageEvent } from "../context.model.ts" -import { CurrentLlmConfig } from "../llm-config.ts" +import { AssistantMessageEvent, type ContextEvent, TextDeltaEvent } from "../domain.ts" import { AgentServer } from "../server.service.ts" import { maybeVerifySignature } from "./signature.ts" @@ -115,11 +113,6 @@ const layercodeWebhookHandler = (welcomeMessage: Option.Option) => const agentServer = yield* AgentServer const config = yield* AppConfig - // Get context services to provide to the stream - const langModel = yield* LanguageModel.LanguageModel - const fs = yield* FileSystem.FileSystem - const llmConfig = yield* CurrentLlmConfig - yield* Effect.logDebug("POST /layercode/webhook") // Read body @@ -163,17 +156,14 @@ const layercodeWebhookHandler = (welcomeMessage: Option.Option) => const contextName = sessionToContextName(webhookEvent.session_id) const turnId = webhookEvent.turn_id - // Convert to our format - const userMessage = new UserMessageEvent({ content: webhookEvent.text }) + // Convert to our input format + const userMessage = { _tag: "UserMessage" as const, content: webhookEvent.text } - // Stream SSE events directly - provide services to remove context requirements + // Stream SSE events directly const sseStream = agentServer.handleRequest(contextName, [userMessage]).pipe( Stream.map((event) => toLayerCodeResponse(event, turnId)), Stream.filter((r): r is LayerCodeResponse => r !== null), - Stream.map(encodeLayerCodeSSE), - Stream.provideService(LanguageModel.LanguageModel, langModel), - Stream.provideService(FileSystem.FileSystem, fs), - Stream.provideService(CurrentLlmConfig, llmConfig) + Stream.map(encodeLayerCodeSSE) ) return HttpServerResponse.stream(sseStream, { @@ -281,9 +271,6 @@ export const makeLayerCodeRouter = ( never, | AgentServer | AppConfig - | LanguageModel.LanguageModel - | FileSystem.FileSystem - | CurrentLlmConfig > => HttpRouter.empty.pipe( HttpRouter.post("/layercode/webhook", layercodeWebhookHandler(welcomeMessage)) diff --git a/src/new-architecture/llm-turn.ts b/src/llm-turn.ts similarity index 98% rename from src/new-architecture/llm-turn.ts rename to src/llm-turn.ts index 5178c9f..e6803c4 100644 --- a/src/new-architecture/llm-turn.ts +++ b/src/llm-turn.ts @@ -7,7 +7,6 @@ import { type AiError, LanguageModel, Prompt } from "@effect/ai" import { DateTime, Effect, Layer, Option, pipe, Ref, Stream } from "effect" -import { CurrentLlmConfig } from "../llm-config.ts" import { AgentError, type AgentName, @@ -19,6 +18,7 @@ import { type ReducedContext, TextDeltaEvent } from "./domain.ts" +import { CurrentLlmConfig } from "./llm-config.ts" /** * Convert ReducedContext messages to @effect/ai Prompt. diff --git a/src/llm.ts b/src/llm.ts deleted file mode 100644 index 0cb69b2..0000000 --- a/src/llm.ts +++ /dev/null @@ -1,178 +0,0 @@ -/** - * LLM Request - * - * Pure function that takes persisted events and produces a stream of context events. - */ -import { type AiError, LanguageModel, Prompt } from "@effect/ai" -import { type Error as PlatformError, FileSystem } from "@effect/platform" -import { Clock, Effect, Option, pipe, Ref, Schema, Stream } from "effect" -import { - AssistantMessageEvent, - type ContextEvent, - FileAttachmentEvent, - LLMRequestInterruptedEvent, - type PersistedEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "./context.model.ts" -import { CurrentLlmConfig } from "./llm-config.ts" - -// ============================================================================= -// Event to Prompt Conversion -// ============================================================================= - -const isSystem = Schema.is(SystemPromptEvent) -const isAssistant = Schema.is(AssistantMessageEvent) -const isUser = Schema.is(UserMessageEvent) -const isFile = Schema.is(FileAttachmentEvent) -const isInterrupted = Schema.is(LLMRequestInterruptedEvent) - -/** - * Groups consecutive user events (messages + attachments) into single multi-part messages. - * File attachments are read at call time, not persisted as base64. - * @internal Exported for testing - */ -export const eventsToPrompt = ( - events: ReadonlyArray -): Effect.Effect => - Effect.gen(function*() { - const fs = yield* FileSystem.FileSystem - const messages: Array = [] - - let i = 0 - while (i < events.length) { - const event = events[i]! - - if (isSystem(event)) { - messages.push(Prompt.makeMessage("system", { content: event.content })) - i++ - } else if (isAssistant(event)) { - messages.push( - Prompt.makeMessage("assistant", { - content: [Prompt.makePart("text", { text: event.content })] - }) - ) - i++ - } else if (isInterrupted(event)) { - // Only include if there was actual content before interruption - if (event.partialResponse.trim()) { - // Include partial response as assistant message - messages.push( - Prompt.makeMessage("assistant", { - content: [Prompt.makePart("text", { text: event.partialResponse })] - }) - ) - // Add user message explaining the interruption - messages.push( - Prompt.makeMessage("user", { - content: [Prompt.makePart("text", { - text: - `Your previous response was interrupted by the user. Here is what you said until that point:\n\n${event.partialResponse}` - })] - }) - ) - } - i++ - } else if (isUser(event) || isFile(event)) { - // Consecutive user/file events become a single multi-part user message - const userParts: Array = [] - - while (i < events.length) { - const e = events[i]! - if (isFile(e)) { - if (e.source.type === "file") { - const bytes = yield* fs.readFile(e.source.path) - userParts.push( - Prompt.makePart("file", { - mediaType: e.mediaType, - data: bytes, - fileName: e.fileName - }) - ) - } else { - userParts.push( - Prompt.makePart("file", { - mediaType: e.mediaType, - data: new URL(e.source.url), - fileName: e.fileName - }) - ) - } - i++ - } else if (isUser(e)) { - userParts.push(Prompt.makePart("text", { text: e.content })) - i++ - } else { - break - } - } - - if (userParts.length > 0) { - messages.push(Prompt.makeMessage("user", { content: userParts })) - } - } else { - i++ - } - } - - return Prompt.make(messages) - }) - -/** - * Stream an LLM response from persisted conversation events. - * - * Takes the full conversation history and produces: - * 1. TextDelta events for each streaming chunk (ephemeral) - * 2. A final AssistantMessage event with the complete response (persisted) - */ -export const streamLLMResponse = ( - events: ReadonlyArray -): Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig -> => - Stream.unwrap( - Effect.fn("LLM.streamLLMResponse")(function*() { - const model = yield* LanguageModel.LanguageModel - const llmConfig = yield* CurrentLlmConfig - const fullResponseRef = yield* Ref.make("") - const firstTokenReceivedRef = yield* Ref.make(false) - - yield* Effect.annotateCurrentSpan("gen_ai.base_url", llmConfig.baseUrl) - - yield* Effect.logDebug(`Streaming LLM response`, { - model: llmConfig.model, - apiFormat: llmConfig.apiFormat - }) - - const prompt = yield* eventsToPrompt(events) - const startTime = yield* Clock.currentTimeMillis - - return pipe( - model.streamText({ prompt }), - Stream.filterMap((part) => part.type === "text-delta" ? Option.some(part.delta) : Option.none()), - Stream.mapEffect((delta) => - Effect.gen(function*() { - const alreadyRecorded = yield* Ref.get(firstTokenReceivedRef) - if (!alreadyRecorded) { - yield* Ref.set(firstTokenReceivedRef, true) - const now = yield* Clock.currentTimeMillis - const ttft = now - startTime - yield* Effect.annotateCurrentSpan("gen_ai.time_to_first_token_ms", ttft) - } - yield* Ref.update(fullResponseRef, (t) => t + delta) - return new TextDeltaEvent({ delta }) as ContextEvent - }) - ), - Stream.concat( - Stream.fromEffect( - Ref.get(fullResponseRef).pipe( - Effect.map((content) => new AssistantMessageEvent({ content }) as ContextEvent) - ) - ) - ) - ) - })() - ) diff --git a/src/new-architecture/mini-agent.test.ts b/src/mini-agent.test.ts similarity index 100% rename from src/new-architecture/mini-agent.test.ts rename to src/mini-agent.test.ts diff --git a/src/new-architecture/mini-agent.ts b/src/mini-agent.ts similarity index 100% rename from src/new-architecture/mini-agent.ts rename to src/mini-agent.ts diff --git a/src/new-architecture/cli.e2e.test.ts b/src/new-architecture/cli.e2e.test.ts deleted file mode 100644 index 7937d17..0000000 --- a/src/new-architecture/cli.e2e.test.ts +++ /dev/null @@ -1,168 +0,0 @@ -/** - * CLI E2E Tests for new architecture. - * - * Tests the new actor-based CLI functionality. - * Uses mock LLM server by default, or real LLM with USE_REAL_LLM=1. - */ -import { Command } from "@effect/platform" -import { BunContext } from "@effect/platform-bun" -import { Effect, Stream } from "effect" -import * as fs from "node:fs" -import * as path from "node:path" -import { describe } from "vitest" -import { expect, test, useRealLlm } from "../../test/fixtures.js" - -const CLI_PATH = path.resolve(__dirname, "./cli.ts") - -interface CliResult { - stdout: string - stderr: string - exitCode: number -} - -/** Run CLI with full control over env and output capture */ -const runCli = ( - args: Array, - options: { cwd?: string; env?: Record } = {} -): Effect.Effect => { - // Inherit all parent env vars (including API keys from Doppler) - const env = { - ...process.env, - ...options.env - } - - // Build command - use workingDirectory instead of --cwd arg - let cmd = Command.make("bun", CLI_PATH, ...args) - if (options.cwd) cmd = Command.workingDirectory(cmd, options.cwd) - cmd = Command.env(cmd, env) - - return Effect.scoped( - Effect.gen(function*() { - const proc = yield* Command.start(cmd) - const [stdout, stderr, exitCode] = yield* Effect.all([ - proc.stdout.pipe(Stream.decodeText(), Stream.mkString), - proc.stderr.pipe(Stream.decodeText(), Stream.mkString), - proc.exitCode - ]) - return { stdout, stderr, exitCode } - }) - ).pipe( - Effect.provide(BunContext.layer), - Effect.catchAll((e) => Effect.succeed({ stdout: "", stderr: String(e), exitCode: 1 })) - ) -} - -/** Extract JSON objects from CLI response */ -const extractJsonOutput = (output: string): string => { - const jsonObjects: Array = [] - let depth = 0 - let start = -1 - - for (let i = 0; i < output.length; i++) { - if (output[i] === "{") { - if (depth === 0) start = i - depth++ - } else if (output[i] === "}") { - depth-- - if (depth === 0 && start !== -1) { - const obj = output.slice(start, i + 1) - if (obj.includes("\"_tag\"")) { - jsonObjects.push(obj) - } - start = -1 - } - } - } - - return jsonObjects.join("\n") -} - -describe("New Architecture CLI", () => { - describe("--help", () => { - test("shows help message", async () => { - const result = await Effect.runPromise(runCli(["--help"])) - expect(result.stdout).toContain("mini-agent-v2") - expect(result.stdout).toContain("chat") - }) - - test("shows chat help", async () => { - const result = await Effect.runPromise(runCli(["chat", "--help"])) - expect(result.stdout).toContain("--name") - expect(result.stdout).toContain("--message") - expect(result.stdout).toContain("--raw") - }) - }) - - describe("chat command", () => { - // Skip LLM tests when not using real LLM - mock server integration needs work - test.skipIf(!useRealLlm)("sends a message and gets a response", { timeout: 60000 }, async ({ testDir }) => { - const result = await Effect.runPromise( - runCli(["chat", "-n", "test-context", "-m", "Say exactly: TEST_V2_RESPONSE"], { cwd: testDir }) - ) - - expect(result.stdout.length).toBeGreaterThan(0) - expect(result.exitCode).toBe(0) - }) - - test.skipIf(!useRealLlm)("outputs JSON in raw mode", { timeout: 60000 }, async ({ testDir }) => { - const result = await Effect.runPromise( - runCli(["chat", "-n", "raw-test", "-m", "Say exactly: RAW_TEST", "--raw"], { cwd: testDir }) - ) - - expect(result.exitCode).toBe(0) - const jsonOutput = extractJsonOutput(result.stdout) - expect(jsonOutput).toContain("\"_tag\"") - expect(jsonOutput).toContain("\"AssistantMessageEvent\"") - }) - - test.skipIf(!useRealLlm)("creates context file", { timeout: 60000 }, async ({ testDir }) => { - await Effect.runPromise( - runCli(["chat", "-n", "persist-test", "-m", "Hello"], { cwd: testDir }) - ) - - // Context file should exist in v2 contexts dir - const contextsDir = path.join(testDir, ".mini-agent", "contexts-v2") - expect(fs.existsSync(contextsDir)).toBe(true) - - const files = fs.readdirSync(contextsDir) - expect(files.some((f) => f.includes("persist-test"))).toBe(true) - }) - - test.skipIf(!useRealLlm)("maintains conversation history", { timeout: 90000 }, async ({ testDir }) => { - // First message - await Effect.runPromise( - runCli(["chat", "-n", "history-test", "-m", "Remember: my secret code is ABC123"], { cwd: testDir }) - ) - - // Second message - const result = await Effect.runPromise( - runCli(["chat", "-n", "history-test", "-m", "What is my secret code?", "--raw"], { cwd: testDir }) - ) - - expect(result.exitCode).toBe(0) - expect(result.stdout.toLowerCase()).toContain("abc123") - }) - }) -}) - -describe("Multi-LLM", () => { - const llms = [ - { llm: "openai:gpt-4.1-mini" }, - { llm: "anthropic:claude-haiku-4-5" } - ] as const - - // Skip multi-LLM tests when not using real LLM - describe.skipIf(!useRealLlm).each(llms)("LLM: $llm", ({ llm }) => { - test( - "basic chat works", - { timeout: 60000 }, - async ({ testDir }) => { - const result = await Effect.runPromise( - runCli(["chat", "-n", "llm-test", "-m", "Say exactly: SUCCESS"], { cwd: testDir, env: { LLM: llm } }) - ) - expect(result.stdout.length).toBeGreaterThan(0) - expect(result.exitCode).toBe(0) - } - ) - }) -}) diff --git a/src/new-architecture/cli.ts b/src/new-architecture/cli.ts deleted file mode 100644 index fb26598..0000000 --- a/src/new-architecture/cli.ts +++ /dev/null @@ -1,195 +0,0 @@ -/** - * CLI wrapper for new architecture. - * - * Simple CLI entry point demonstrating the actor-based architecture. - * Usage: bun run src/new-architecture/cli.ts chat -n -m - */ - -import { AnthropicClient, AnthropicLanguageModel } from "@effect/ai-anthropic" -import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google" -import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai" -import { Command, Options } from "@effect/cli" -import { FetchHttpClient, Terminal } from "@effect/platform" -import { BunContext, BunRuntime } from "@effect/platform-bun" -import { ConfigProvider, Effect, Fiber, Layer, LogLevel, Option, Stream } from "effect" -import { AppConfig, type MiniAgentConfig } from "../config.ts" -import { CurrentLlmConfig, getApiKey, type LlmConfig, resolveLlmConfig } from "../llm-config.ts" -import { createLoggingLayer } from "../logging.ts" -import { OpenAiChatClient, OpenAiChatLanguageModel } from "../openai-chat-completions-client.ts" -import { AgentRegistry } from "./agent-registry.ts" -import { type AgentName, EventBuilder } from "./domain.ts" -import { EventReducer } from "./event-reducer.ts" -import { EventStoreFileSystem } from "./event-store-fs.ts" -import { LlmTurnLive } from "./llm-turn.ts" - -const makeLanguageModelLayer = (llmConfig: LlmConfig) => { - const apiKey = getApiKey(llmConfig) - - switch (llmConfig.apiFormat) { - case "openai-responses": - return OpenAiLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "openai-chat-completions": - return OpenAiChatLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiChatClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "anthropic": - return AnthropicLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - AnthropicClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "gemini": - return GoogleLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - GoogleClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - } -} - -// CLI Options -const nameOption = Options.text("name").pipe( - Options.withAlias("n"), - Options.withDescription("Context/agent name"), - Options.withDefault("default") -) - -const messageOption = Options.text("message").pipe( - Options.withAlias("m"), - Options.withDescription("Message to send") -) - -const rawOption = Options.boolean("raw").pipe( - Options.withAlias("r"), - Options.withDescription("Output raw JSON events"), - Options.withDefault(false) -) - -const cwdOption = Options.directory("cwd").pipe( - Options.withDescription("Working directory"), - Options.optional -) - -// Chat command -const chatCommand = Command.make( - "chat", - { name: nameOption, message: messageOption, raw: rawOption }, - ({ message, name, raw }) => - Effect.gen(function*() { - const terminal = yield* Terminal.Terminal - const registry = yield* AgentRegistry - - const agentName = name as AgentName - const agent = yield* registry.getOrCreate(agentName) - - // Get current context to know event number - const ctx = yield* agent.getReducedContext - - // Subscribe to events BEFORE adding the user message - // Fork the stream consumption so it runs in parallel - const streamFiber = yield* agent.events.pipe( - Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), - Stream.tap((event) => { - if (raw) { - return terminal.display(JSON.stringify(event) + "\n") - } else if (event._tag === "TextDeltaEvent") { - return terminal.display(event.delta) - } else if (event._tag === "AssistantMessageEvent") { - return terminal.display("\n") - } - return Effect.void - }), - Stream.runDrain, - Effect.fork - ) - - // Add user message with triggersAgentTurn=true - const userEvent = EventBuilder.userMessage( - agentName, - agent.contextName, - ctx.nextEventNumber, - message - ) - yield* agent.addEvent(userEvent) - - // Wait for stream to complete - yield* Fiber.join(streamFiber) - }) -) - -// Root command -const cli = Command.make("mini-agent-v2", { cwd: cwdOption }).pipe( - Command.withSubcommands([chatCommand]) -) - -const cliApp = Command.run(cli, { - name: "mini-agent-v2", - version: "2.0.0" -}) - -// Default config for CLI -const defaultConfig: MiniAgentConfig = { - llm: "openai:gpt-4o-mini", - dataStorageDir: ".mini-agent", - configFile: "mini-agent.config.yaml", - cwd: Option.none(), - stdoutLogLevel: LogLevel.Warning, - fileLogLevel: LogLevel.Debug, - port: 3000, - host: "0.0.0.0", - layercodeWebhookSecret: Option.none() -} - -const appConfigLayer = Layer.succeed(AppConfig, defaultConfig) - -const program = Effect.gen(function*() { - const llmConfig = yield* resolveLlmConfig.pipe(Effect.withConfigProvider(ConfigProvider.fromEnv())) - yield* Effect.logDebug("Using LLM config", { provider: llmConfig.apiFormat, model: llmConfig.model }) - - const languageModelLayer = makeLanguageModelLayer(llmConfig) - const llmConfigLayer = CurrentLlmConfig.fromConfig(llmConfig) - - // Build the full layer stack - // AgentRegistry.Default requires EventStore, EventReducer, and MiniAgentTurn - const fullLayer = AgentRegistry.Default.pipe( - Layer.provide(LlmTurnLive), - Layer.provide(languageModelLayer), - Layer.provide(llmConfigLayer), - Layer.provide(EventStoreFileSystem), - Layer.provide(EventReducer.Default), - Layer.provide(appConfigLayer), - Layer.provide(BunContext.layer) - ) - - yield* cliApp(process.argv).pipe(Effect.provide(fullLayer)) -}) - -const loggingLayer = createLoggingLayer({ - stdoutLogLevel: LogLevel.Warning, - fileLogLevel: LogLevel.Debug, - baseDir: ".mini-agent" -}) - -const mainLayer = Layer.mergeAll(loggingLayer, BunContext.layer) - -program.pipe( - Effect.provide(mainLayer), - BunRuntime.runMain -) diff --git a/src/new-architecture/http.ts b/src/new-architecture/http.ts deleted file mode 100644 index 1913b5e..0000000 --- a/src/new-architecture/http.ts +++ /dev/null @@ -1,112 +0,0 @@ -/** - * HTTP Server for new architecture. - * - * Endpoints: - * - POST /agent/:agentName - Send message, receive SSE stream of events - * - GET /health - Health check - */ - -import { HttpRouter, HttpServerRequest, HttpServerResponse } from "@effect/platform" -import { Effect, Schema, Stream } from "effect" -import { AgentRegistry } from "./agent-registry.ts" -import { type AgentName, type ContextEvent, EventBuilder } from "./domain.ts" - -/** Encode a ContextEvent as an SSE data line */ -const encodeSSE = (event: ContextEvent): Uint8Array => new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`) - -/** Input message schema */ -const InputMessage = Schema.Struct({ - _tag: Schema.Literal("UserMessage"), - content: Schema.String -}) -type InputMessage = typeof InputMessage.Type - -/** Parse JSON body into InputMessage */ -const parseBody = (body: string) => - Effect.gen(function*() { - const json = yield* Effect.try({ - try: () => JSON.parse(body) as unknown, - catch: (e) => new Error(`Invalid JSON: ${e instanceof Error ? e.message : String(e)}`) - }) - return yield* Schema.decodeUnknown(InputMessage)(json) - }) - -/** Handler for POST /agent/:agentName */ -const agentHandler = Effect.gen(function*() { - const request = yield* HttpServerRequest.HttpServerRequest - const registry = yield* AgentRegistry - const params = yield* HttpRouter.params - - const agentName = params.agentName - if (!agentName) { - return HttpServerResponse.text("Missing agentName", { status: 400 }) - } - - yield* Effect.logDebug("POST /agent/:agentName", { agentName }) - - // Read body - const body = yield* request.text - - if (body.trim() === "") { - return HttpServerResponse.text("Empty request body", { status: 400 }) - } - - const parseResult = yield* parseBody(body).pipe(Effect.either) - - if (parseResult._tag === "Left") { - return HttpServerResponse.text(parseResult.left.message, { status: 400 }) - } - - const message = parseResult.right - - // Get or create agent - const agent = yield* registry.getOrCreate(agentName as AgentName) - const ctx = yield* agent.getReducedContext - - // Prepare user event - const userEvent = EventBuilder.userMessage( - agentName as AgentName, - agent.contextName, - ctx.nextEventNumber, - message.content - ) - - // Create an SSE stream that: - // 1. First emits the user event (so client sees it immediately) - // 2. Adds the event to the agent (which triggers the LLM turn) - // 3. Then streams all subsequent events until turn completes - const sseStream = Stream.concat( - // Emit user event immediately to client, then add it to agent - Stream.fromEffect( - agent.addEvent(userEvent).pipe( - Effect.as(encodeSSE(userEvent)), - Effect.catchAll(() => Effect.succeed(encodeSSE(userEvent))) - ) - ), - // Stream remaining events (the broadcast will include events after UserMessage) - agent.events.pipe( - Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), - Stream.map(encodeSSE) - ) - ) - - return HttpServerResponse.stream(sseStream, { - contentType: "text/event-stream", - headers: { - "Cache-Control": "no-cache", - "Connection": "keep-alive" - } - }) -}) - -/** Health check endpoint */ -const healthHandler = Effect.gen(function*() { - yield* Effect.logDebug("GET /health") - return yield* HttpServerResponse.json({ status: "ok" }) -}) - -/** HTTP router for new architecture */ -export const makeRouterV2 = HttpRouter.empty.pipe( - HttpRouter.post("/agent/:agentName", agentHandler), - HttpRouter.get("/health", healthHandler) -) diff --git a/src/new-architecture/index.ts b/src/new-architecture/index.ts deleted file mode 100644 index 8f16265..0000000 --- a/src/new-architecture/index.ts +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Mini-Agent Actor Architecture - * - * Event-sourced actor system for LLM interactions. - * Philosophy: "Agent events are all you need" - * - * @module - */ - -// Domain types and events -export { - // Config types - AgentConfig, - // Error types - AgentError, - // Branded types - AgentName, - AgentNotFoundError, - // Event types - AgentTurnCompletedEvent, - AgentTurnFailedEvent, - AgentTurnInterruptedEvent, - AgentTurnNumber, - AgentTurnStartedEvent, - AssistantMessageEvent, - ContextEvent, - ContextLoadError, - ContextName, - ContextSaveError, - defaultAgentConfig, - // Utilities - EventBuilder, - EventId, - InterruptReason, - LlmProviderConfig, - LlmProviderId, - makeEventId, - // Service types - type MiniAgent, - MiniAgentTurn, - // State types - type ReducedContext, - ReducedContext as ReducedContextHelpers, - ReducerError, - SessionEndedEvent, - SessionStartedEvent, - SetLlmConfigEvent, - SetTimeoutEvent, - SystemPromptEvent, - TextDeltaEvent, - UserMessageEvent -} from "./domain.ts" - -// Services -export { AgentRegistry } from "./agent-registry.ts" -export { EventReducer } from "./event-reducer.ts" -export { EventStore } from "./event-store.ts" - -// Agent factory -export { makeMiniAgent } from "./mini-agent.ts" - -// Production layers -export { EventStoreFileSystem } from "./event-store-fs.ts" -export { LlmTurnLive } from "./llm-turn.ts" diff --git a/src/new-architecture/server.e2e.test.ts b/src/new-architecture/server.e2e.test.ts deleted file mode 100644 index b025ca4..0000000 --- a/src/new-architecture/server.e2e.test.ts +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Server E2E Tests for new architecture. - * - * Tests the HTTP server functionality. - * Uses mock LLM server by default, or real LLM with USE_REAL_LLM=1. - */ -import { spawn } from "child_process" -import * as path from "node:path" -import { describe } from "vitest" -import { expect, test, useRealLlm } from "../../test/fixtures.js" - -const SERVER_PATH = path.resolve(__dirname, "./server.ts") - -// Port counter to avoid conflicts -let portCounter = 5000 + Math.floor(Math.random() * 1000) -const getNextPort = () => portCounter++ - -/** Start the server in background */ -const startServer = async ( - cwd: string -): Promise<{ port: number; cleanup: () => Promise }> => { - const port = getNextPort() - - const proc = spawn("bun", [SERVER_PATH, "--port", String(port)], { - cwd, - env: { - ...process.env - }, - stdio: "ignore" - }) - - const cleanup = async () => { - if (!proc.killed) { - proc.kill("SIGTERM") - await new Promise((resolve) => { - const timeout = setTimeout(() => { - proc.kill("SIGKILL") - resolve() - }, 2000) - proc.on("exit", () => { - clearTimeout(timeout) - resolve() - }) - }) - } - } - - // Wait for server to be ready - for (let i = 0; i < 100; i++) { - try { - const res = await fetch(`http://localhost:${port}/health`) - if (res.ok) { - await new Promise((resolve) => setTimeout(resolve, 50)) - return { port, cleanup } - } - } catch { - // Server not ready yet - } - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - await cleanup() - throw new Error(`Server failed to start on port ${port}`) -} - -/** Parse SSE stream from response */ -const parseSSE = async (response: Response): Promise> => { - const text = await response.text() - return text - .split("\n") - .filter((line) => line.startsWith("data: ")) - .map((line) => line.slice(6)) -} - -describe("New Architecture Server", () => { - test("health endpoint returns ok", { timeout: 30000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/health`) - const body = (await response.json()) as { status: string } - - expect(response.status).toBe(200) - expect(body.status).toBe("ok") - } finally { - await cleanup() - } - }) - - // Skip LLM tests when not using real LLM - mock server integration needs work - test.skipIf(!useRealLlm)("agent endpoint processes message", { timeout: 60000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/agent/test-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ _tag: "UserMessage", content: "Say exactly: HELLO_V2" }) - }) - - expect(response.status).toBe(200) - expect(response.headers.get("content-type")).toContain("text/event-stream") - - const events = await parseSSE(response) - expect(events.length).toBeGreaterThan(0) - - // Should have AssistantMessageEvent - const hasAssistant = events.some((e) => e.includes("\"AssistantMessageEvent\"")) - expect(hasAssistant).toBe(true) - } finally { - await cleanup() - } - }) - - test("agent endpoint returns 400 for empty body", { timeout: 30000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/agent/test-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: "" - }) - - expect(response.status).toBe(400) - } finally { - await cleanup() - } - }) - - test("agent endpoint returns 400 for invalid JSON", { timeout: 30000 }, async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - const response = await fetch(`http://localhost:${port}/agent/test-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: "not json" - }) - - expect(response.status).toBe(400) - } finally { - await cleanup() - } - }) - - test.skipIf(!useRealLlm)( - "maintains conversation history across requests", - { timeout: 90000 }, - async ({ testDir }) => { - const { cleanup, port } = await startServer(testDir) - - try { - // First message - await fetch(`http://localhost:${port}/agent/history-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ _tag: "UserMessage", content: "Remember: my code is XYZ789" }) - }) - - // Second message - const response = await fetch(`http://localhost:${port}/agent/history-agent`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ _tag: "UserMessage", content: "What is my code?" }) - }) - - const events = await parseSSE(response) - const fullResponse = events.join("") - - expect(fullResponse.toLowerCase()).toContain("xyz789") - } finally { - await cleanup() - } - } - ) -}) diff --git a/src/new-architecture/server.ts b/src/new-architecture/server.ts deleted file mode 100644 index 8ebadde..0000000 --- a/src/new-architecture/server.ts +++ /dev/null @@ -1,129 +0,0 @@ -/** - * HTTP Server entry point for new architecture. - * - * Usage: bun run src/new-architecture/server.ts [--port PORT] - */ - -import { AnthropicClient, AnthropicLanguageModel } from "@effect/ai-anthropic" -import { GoogleClient, GoogleLanguageModel } from "@effect/ai-google" -import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai" -import { FetchHttpClient, HttpServer } from "@effect/platform" -import { BunContext, BunHttpServer, BunRuntime } from "@effect/platform-bun" -import { ConfigProvider, Effect, Layer, LogLevel, Option } from "effect" -import { AppConfig, type MiniAgentConfig } from "../config.ts" -import { CurrentLlmConfig, getApiKey, type LlmConfig, resolveLlmConfig } from "../llm-config.ts" -import { createLoggingLayer } from "../logging.ts" -import { OpenAiChatClient, OpenAiChatLanguageModel } from "../openai-chat-completions-client.ts" -import { AgentRegistry } from "./agent-registry.ts" -import { EventReducer } from "./event-reducer.ts" -import { EventStoreFileSystem } from "./event-store-fs.ts" -import { makeRouterV2 } from "./http.ts" -import { LlmTurnLive } from "./llm-turn.ts" - -const makeLanguageModelLayer = (llmConfig: LlmConfig) => { - const apiKey = getApiKey(llmConfig) - - switch (llmConfig.apiFormat) { - case "openai-responses": - return OpenAiLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "openai-chat-completions": - return OpenAiChatLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - OpenAiChatClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "anthropic": - return AnthropicLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - AnthropicClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - - case "gemini": - return GoogleLanguageModel.layer({ model: llmConfig.model }).pipe( - Layer.provide( - GoogleClient.layer({ apiKey, apiUrl: llmConfig.baseUrl }).pipe( - Layer.provide(FetchHttpClient.layer) - ) - ) - ) - } -} - -// Parse port from args -const port = (() => { - const portIdx = process.argv.indexOf("--port") - if (portIdx !== -1 && process.argv[portIdx + 1]) { - return parseInt(process.argv[portIdx + 1]!, 10) - } - return 3001 -})() - -// Default config for server -const defaultConfig: MiniAgentConfig = { - llm: "openai:gpt-4o-mini", - dataStorageDir: ".mini-agent", - configFile: "mini-agent.config.yaml", - cwd: Option.none(), - stdoutLogLevel: LogLevel.Warning, - fileLogLevel: LogLevel.Debug, - port, - host: "0.0.0.0", - layercodeWebhookSecret: Option.none() -} - -const appConfigLayer = Layer.succeed(AppConfig, defaultConfig) - -const program = Effect.gen(function*() { - const llmConfig = yield* resolveLlmConfig.pipe(Effect.withConfigProvider(ConfigProvider.fromEnv())) - yield* Effect.log(`Starting server on port ${port}`) - yield* Effect.logDebug("Using LLM config", { provider: llmConfig.apiFormat, model: llmConfig.model }) - - const languageModelLayer = makeLanguageModelLayer(llmConfig) - const llmConfigLayer = CurrentLlmConfig.fromConfig(llmConfig) - - // Build the full layer stack - // AgentRegistry.Default requires EventStore, EventReducer, and MiniAgentTurn - const serviceLayer = AgentRegistry.Default.pipe( - Layer.provide(LlmTurnLive), - Layer.provide(languageModelLayer), - Layer.provide(llmConfigLayer), - Layer.provide(EventStoreFileSystem), - Layer.provide(EventReducer.Default), - Layer.provide(appConfigLayer), - Layer.provide(BunContext.layer) - ) - - // HTTP server layer - const serverLayer = HttpServer.serve(makeRouterV2).pipe( - Layer.provide(BunHttpServer.layer({ port })), - Layer.provide(serviceLayer) - ) - - return yield* Layer.launch(serverLayer) -}) - -const loggingLayer = createLoggingLayer({ - stdoutLogLevel: LogLevel.Info, - fileLogLevel: LogLevel.Debug, - baseDir: ".mini-agent" -}) - -const mainLayer = Layer.mergeAll(loggingLayer, BunContext.layer) - -program.pipe( - Effect.provide(mainLayer), - BunRuntime.runMain -) diff --git a/src/server.service.ts b/src/server.service.ts index 18e22ff..9e1ed51 100644 --- a/src/server.service.ts +++ b/src/server.service.ts @@ -4,48 +4,105 @@ * Provides the same abstraction level as the CLI for handling agent requests. * Accepts JSONL events (like script mode) and streams back ContextEvents. */ -import type { AiError, LanguageModel } from "@effect/ai" -import type { Error as PlatformError, FileSystem } from "@effect/platform" -import { Context, Effect, Layer, Schema, Stream } from "effect" -import type { ContextEvent, InputEvent } from "./context.model.ts" -import { SystemPromptEvent, UserMessageEvent } from "./context.model.ts" -import { ContextService } from "./context.service.ts" -import type { ContextLoadError, ContextSaveError } from "./errors.ts" -import type { CurrentLlmConfig } from "./llm-config.ts" +import { Context, Effect, Fiber, Layer, Schema, Stream } from "effect" +import { AgentRegistry } from "./agent-registry.ts" +import { + type AgentName, + type ContextEvent, + type ContextSaveError, + DEFAULT_SYSTEM_PROMPT, + EventBuilder, + type ReducerError +} from "./domain.ts" /** Script mode input events - schema for HTTP parsing */ -export const ScriptInputEvent = Schema.Union(UserMessageEvent, SystemPromptEvent) +export const ScriptInputEvent = Schema.Union( + Schema.Struct({ _tag: Schema.Literal("UserMessage"), content: Schema.String }), + Schema.Struct({ _tag: Schema.Literal("SystemPrompt"), content: Schema.String }) +) export type ScriptInputEvent = typeof ScriptInputEvent.Type +/** Input event type for handleRequest */ +export type InputEvent = ScriptInputEvent + export class AgentServer extends Context.Tag("@app/AgentServer")< AgentServer, { /** * Handle a request with input events, streaming back ContextEvents. * Same semantics as CLI script mode. - * - * Note: The returned stream requires LanguageModel, FileSystem, and CurrentLlmConfig - * to be provided before running. */ readonly handleRequest: ( contextName: string, events: ReadonlyArray - ) => Stream.Stream< - ContextEvent, - AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError, - LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig - > + ) => Stream.Stream } >() { static readonly layer = Layer.effect( AgentServer, Effect.gen(function*() { - const contextService = yield* ContextService + const registry = yield* AgentRegistry const handleRequest = ( contextName: string, - events: ReadonlyArray - ) => contextService.addEvents(contextName, events) + inputEvents: ReadonlyArray + ): Stream.Stream => + Stream.asyncScoped((emit) => + Effect.gen(function*() { + const agentName = contextName as AgentName + const agent = yield* registry.getOrCreate(agentName) + + // Check if context needs initialization + const ctx = yield* agent.getReducedContext + if (ctx.messages.length === 0) { + // Check if input events contain a system prompt + const hasSystemPrompt = inputEvents.some((e) => e._tag === "SystemPrompt") + if (!hasSystemPrompt) { + const systemEvent = EventBuilder.systemPrompt( + agentName, + agent.contextName, + ctx.nextEventNumber, + DEFAULT_SYSTEM_PROMPT + ) + yield* agent.addEvent(systemEvent) + } + } + + // Subscribe to agent events BEFORE adding input events + const streamFiber = yield* agent.events.pipe( + Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"), + Stream.tap((event) => Effect.sync(() => emit.single(event))), + Stream.runDrain, + Effect.ensuring(Effect.sync(() => emit.end())), + Effect.fork + ) + + // Add input events to agent + for (const event of inputEvents) { + const currentCtx = yield* agent.getReducedContext + if (event._tag === "UserMessage") { + const userEvent = EventBuilder.userMessage( + agentName, + agent.contextName, + currentCtx.nextEventNumber, + event.content + ) + yield* agent.addEvent(userEvent) + } else if (event._tag === "SystemPrompt") { + const systemEvent = EventBuilder.systemPrompt( + agentName, + agent.contextName, + currentCtx.nextEventNumber, + event.content + ) + yield* agent.addEvent(systemEvent) + } + } + + // Wait for stream to complete + yield* Fiber.join(streamFiber) + }) + ) return AgentServer.of({ handleRequest }) }) diff --git a/test/cli.e2e.test.ts b/test/cli.e2e.test.ts index cdd168f..56e0a43 100644 --- a/test/cli.e2e.test.ts +++ b/test/cli.e2e.test.ts @@ -112,11 +112,11 @@ describe("CLI", () => { expect(result.stdout.length).toBeGreaterThan(0) - // Context file should exist with random name (chat-xxxxx pattern) - const contextsDir = path.join(testDir, ".mini-agent", "contexts") + // Context file should exist with random name (chat-xxxxx-v1 pattern in contexts-v2) + const contextsDir = path.join(testDir, ".mini-agent", "contexts-v2") const files = fs.readdirSync(contextsDir) expect(files.length).toBe(1) - expect(files[0]).toMatch(/^chat-[a-z0-9]{5}\.yaml$/) + expect(files[0]).toMatch(/^chat-[a-z0-9]{5}-v1\.yaml$/) }) }) @@ -133,7 +133,7 @@ describe("CLI", () => { const jsonOutput = extractJsonOutput(result.stdout) // Should contain JSON with _tag field expect(jsonOutput).toContain("\"_tag\"") - expect(jsonOutput).toContain("\"AssistantMessage\"") + expect(jsonOutput).toContain("\"AssistantMessageEvent\"") }) test("includes ephemeral events with --show-ephemeral", { timeout: 15000 }, async ({ llmEnv, testDir }) => { @@ -147,7 +147,7 @@ describe("CLI", () => { expect(result.exitCode).toBe(0) const jsonOutput = extractJsonOutput(result.stdout) // Should contain TextDelta events when showing ephemeral - expect(jsonOutput).toContain("\"TextDelta\"") + expect(jsonOutput).toContain("\"TextDeltaEvent\"") }) }) @@ -215,7 +215,7 @@ describe("CLI", () => { // Should echo the input event and have AssistantMessage response expect(output).toContain("\"UserMessage\"") - expect(output).toContain("\"AssistantMessage\"") + expect(output).toContain("\"AssistantMessageEvent\"") }) test("handles multiple UserMessage events in sequence", { timeout: 15000 }, async ({ llmEnv, testDir }) => { @@ -239,7 +239,7 @@ describe("CLI", () => { const jsonLines = extractJsonLines(output) // Should have at least two AssistantMessage events (one per input) - const assistantMessages = jsonLines.filter((line) => line.includes("\"AssistantMessage\"")) + const assistantMessages = jsonLines.filter((line) => line.includes("\"AssistantMessageEvent\"")) expect(assistantMessages.length).toBeGreaterThanOrEqual(2) // Second response should mention the secret code @@ -270,7 +270,7 @@ describe("CLI", () => { // Should echo both events expect(output).toContain("\"SystemPrompt\"") expect(output).toContain("\"UserMessage\"") - expect(output).toContain("\"AssistantMessage\"") + expect(output).toContain("\"AssistantMessageEvent\"") }) test("includes TextDelta streaming events by default", { timeout: 15000 }, async ({ llmEnv, testDir }) => { @@ -290,9 +290,9 @@ describe("CLI", () => { ) // Script mode should include TextDelta events (streaming chunks) by default - expect(output).toContain("\"TextDelta\"") + expect(output).toContain("\"TextDeltaEvent\"") expect(output).toContain("\"delta\"") - expect(output).toContain("\"AssistantMessage\"") + expect(output).toContain("\"AssistantMessageEvent\"") }) }) @@ -302,8 +302,8 @@ describe("CLI", () => { runCli(["chat", "-n", TEST_CONTEXT, "-m", "Hello"], { cwd: testDir, env: llmEnv }) ) - // Context file should exist in testDir/.mini-agent/contexts/ - const contextPath = path.join(testDir, ".mini-agent", "contexts", `${TEST_CONTEXT}.yaml`) + // Context file should exist in testDir/.mini-agent/contexts-v2/ with -v1 suffix + const contextPath = path.join(testDir, ".mini-agent", "contexts-v2", `${TEST_CONTEXT}-v1.yaml`) expect(fs.existsSync(contextPath)).toBe(true) }) @@ -323,8 +323,8 @@ describe("CLI", () => { expect(result.exitCode).toBe(0) const jsonOutput = extractJsonOutput(result.stdout) - // Response should be JSON with AssistantMessage containing "blue" - expect(jsonOutput).toContain("\"AssistantMessage\"") + // Response should be JSON with AssistantMessageEvent containing "blue" + expect(jsonOutput).toContain("\"AssistantMessageEvent\"") expect(jsonOutput.toLowerCase()).toContain("blue") }) }) diff --git a/test/llm.test.ts b/test/llm.test.ts deleted file mode 100644 index 2d060c5..0000000 --- a/test/llm.test.ts +++ /dev/null @@ -1,169 +0,0 @@ -/** - * LLM Module Tests - */ -import { BunContext } from "@effect/platform-bun" -import { describe, expect, it } from "@effect/vitest" -import { Effect } from "effect" -import { - AssistantMessageEvent, - LLMRequestInterruptedEvent, - SystemPromptEvent, - UserMessageEvent -} from "../src/context.model.ts" -import { eventsToPrompt } from "../src/llm.ts" - -const testLayer = BunContext.layer - -describe("eventsToPrompt", () => { - it.effect("converts basic conversation to prompt", () => - Effect.gen(function*() { - const events = [ - new SystemPromptEvent({ content: "You are helpful" }), - new UserMessageEvent({ content: "Hello" }), - new AssistantMessageEvent({ content: "Hi there!" }), - new UserMessageEvent({ content: "How are you?" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - expect(messages).toHaveLength(4) - expect(messages[0]?.role).toBe("system") - expect(messages[1]?.role).toBe("user") - expect(messages[2]?.role).toBe("assistant") - expect(messages[3]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) - - it.effect("includes interrupted response as assistant message", () => - Effect.gen(function*() { - const events = [ - new SystemPromptEvent({ content: "You are helpful" }), - new UserMessageEvent({ content: "Tell me a story" }), - new LLMRequestInterruptedEvent({ - requestId: "test-123", - reason: "user_cancel", - partialResponse: "Once upon a time, there was a" - }), - new UserMessageEvent({ content: "Continue" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // Should have: system, user, assistant (partial), user (interruption notice), user (new message) - expect(messages).toHaveLength(5) - - expect(messages[0]?.role).toBe("system") - expect(messages[1]?.role).toBe("user") - - // Interrupted response becomes assistant message - const assistantMsg = messages[2] - expect(assistantMsg?.role).toBe("assistant") - if (assistantMsg?.role === "assistant") { - const assistantContent = assistantMsg.content - expect(Array.isArray(assistantContent)).toBe(true) - const firstPart = assistantContent[0] - expect(firstPart?.type).toBe("text") - if (firstPart?.type === "text") { - expect(firstPart.text).toBe("Once upon a time, there was a") - } - } - - // Interruption notice is injected as user message - const noticeMsg = messages[3] - expect(noticeMsg?.role).toBe("user") - if (noticeMsg?.role === "user") { - const noticeContent = noticeMsg.content - expect(Array.isArray(noticeContent)).toBe(true) - const firstPart = noticeContent[0] - expect(firstPart?.type).toBe("text") - if (firstPart?.type === "text") { - expect(firstPart.text).toContain("Your previous response was interrupted") - expect(firstPart.text).toContain("Once upon a time, there was a") - } - } - - // New user message - expect(messages[4]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) - - it.effect("handles user_new_message interruption reason", () => - Effect.gen(function*() { - const events = [ - new UserMessageEvent({ content: "Start" }), - new LLMRequestInterruptedEvent({ - requestId: "test-456", - reason: "user_new_message", - partialResponse: "I was saying" - }), - new UserMessageEvent({ content: "New question" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // user, assistant (partial), user (notice), user (new) - expect(messages).toHaveLength(4) - expect(messages[0]?.role).toBe("user") - expect(messages[1]?.role).toBe("assistant") - expect(messages[2]?.role).toBe("user") - expect(messages[3]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) - - it.effect("handles multiple interruptions in sequence", () => - Effect.gen(function*() { - const events = [ - new UserMessageEvent({ content: "First question" }), - new LLMRequestInterruptedEvent({ - requestId: "test-1", - reason: "user_cancel", - partialResponse: "First partial" - }), - new UserMessageEvent({ content: "Second question" }), - new LLMRequestInterruptedEvent({ - requestId: "test-2", - reason: "user_cancel", - partialResponse: "Second partial" - }), - new UserMessageEvent({ content: "Third question" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // user, assistant, user (notice), user, assistant, user (notice), user - expect(messages).toHaveLength(7) - - let userCount = 0 - let assistantCount = 0 - for (const msg of messages) { - if (msg.role === "user") userCount++ - if (msg.role === "assistant") assistantCount++ - } - - expect(userCount).toBe(5) // 3 real + 2 interruption notices - expect(assistantCount).toBe(2) // 2 partial responses - }).pipe(Effect.provide(testLayer))) - - it.effect("skips empty partial response in interruption", () => - Effect.gen(function*() { - // Edge case: interrupted before any content was generated - const events = [ - new UserMessageEvent({ content: "Question" }), - new LLMRequestInterruptedEvent({ - requestId: "test-empty", - reason: "user_cancel", - partialResponse: "" - }), - new UserMessageEvent({ content: "New question" }) - ] - - const prompt = yield* eventsToPrompt(events) - const messages = prompt.content - - // Empty interruption is skipped - only the two user messages remain - expect(messages).toHaveLength(2) - expect(messages[0]?.role).toBe("user") - expect(messages[1]?.role).toBe("user") - }).pipe(Effect.provide(testLayer))) -}) diff --git a/test/server.e2e.test.ts b/test/server.e2e.test.ts index c98540e..d9f8e9c 100644 --- a/test/server.e2e.test.ts +++ b/test/server.e2e.test.ts @@ -156,8 +156,8 @@ describe("HTTP Server", () => { const events = await parseSSE(response) expect(events.length).toBeGreaterThan(0) - // Should have AssistantMessage event - const hasAssistant = events.some((e) => e.includes("\"AssistantMessage\"")) + // Should have AssistantMessageEvent + const hasAssistant = events.some((e) => e.includes("\"AssistantMessageEvent\"")) expect(hasAssistant).toBe(true) } finally { await cleanup() diff --git a/test/services.unit.test.ts b/test/services.unit.test.ts deleted file mode 100644 index 5738a33..0000000 --- a/test/services.unit.test.ts +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Service Unit Tests - * - * Tests services using testLayer pattern for isolated unit testing. - * See: https://www.effect.solutions/testing - * - * Pattern: Each test provides a fresh layer so state never leaks between tests. - */ -import { describe, expect, it } from "@effect/vitest" -import { Effect } from "effect" -import { SystemPromptEvent, UserMessageEvent } from "../src/context.model.js" -import { ContextRepository } from "../src/context.repository.js" - -// ============================================================================= -// ContextRepository Tests -// ============================================================================= - -describe("ContextRepository", () => { - describe("load", () => { - it.effect("returns empty array for non-existent context", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const events = yield* repo.load("non-existent") - expect(events).toEqual([]) - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("returns saved events after save", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const events = [ - new SystemPromptEvent({ content: "Test system prompt" }), - new UserMessageEvent({ content: "Hello" }) - ] - - yield* repo.save("test-context", events) - const loaded = yield* repo.load("test-context") - - expect(loaded).toHaveLength(2) - expect(loaded[0]?._tag).toBe("SystemPrompt") - expect(loaded[1]?._tag).toBe("UserMessage") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("loadOrCreate", () => { - it.effect("creates context with system prompt if not exists", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const events = yield* repo.loadOrCreate("new-context") - - expect(events).toHaveLength(1) - expect(events[0]?._tag).toBe("SystemPrompt") - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("returns existing events if context exists", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const initial = [new SystemPromptEvent({ content: "Custom prompt" })] - yield* repo.save("existing", initial) - - const events = yield* repo.loadOrCreate("existing") - - expect(events).toHaveLength(1) - expect(events[0]?._tag).toBe("SystemPrompt") - expect(events[0]?._tag === "SystemPrompt" && events[0].content).toBe("Custom prompt") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("list", () => { - it.effect("returns empty array when no contexts", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const contexts = yield* repo.list() - expect(contexts).toEqual([]) - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("returns sorted context names", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - yield* repo.save("zebra", [new SystemPromptEvent({ content: "z" })]) - yield* repo.save("alpha", [new SystemPromptEvent({ content: "a" })]) - - const contexts = yield* repo.list() - - expect(contexts).toEqual(["alpha", "zebra"]) - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("save", () => { - it.effect("overwrites existing context", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const initial = [new SystemPromptEvent({ content: "First" })] - const updated = [ - new SystemPromptEvent({ content: "Second" }), - new UserMessageEvent({ content: "Hello" }) - ] - - yield* repo.save("overwrite-test", initial) - yield* repo.save("overwrite-test", updated) - - const loaded = yield* repo.load("overwrite-test") - expect(loaded).toHaveLength(2) - expect(loaded[0]?._tag === "SystemPrompt" && loaded[0].content).toBe("Second") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) - - describe("getContextsDir", () => { - it.effect("returns test directory path", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - const dir = repo.getContextsDir() - expect(dir).toBe("/test/contexts") - }).pipe(Effect.provide(ContextRepository.testLayer))) - }) -}) - -// ============================================================================= -// State Isolation Tests -// ============================================================================= - -describe("Test Isolation", () => { - it.effect("first test saves data", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - yield* repo.save("isolation-test", [ - new SystemPromptEvent({ content: "First test data" }) - ]) - - const loaded = yield* repo.load("isolation-test") - expect(loaded).toHaveLength(1) - }).pipe(Effect.provide(ContextRepository.testLayer))) - - it.effect("second test has fresh state (no leakage)", () => - Effect.gen(function*() { - const repo = yield* ContextRepository - // Should not see data from first test - const loaded = yield* repo.load("isolation-test") - expect(loaded).toEqual([]) - }).pipe(Effect.provide(ContextRepository.testLayer))) -})