Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,23 @@ LLM='{"apiFormat":"openai-chat-completions","model":"my-model","baseUrl":"https:

## Event Types

See [`src/context.model.ts`](src/context.model.ts) for schema definitions.
See [`src/domain.ts`](src/domain.ts) for schema definitions.

**Input Events** (via stdin in script mode):
- `UserMessage` - User message content
- `SystemPrompt` - System behavior configuration
- `FileAttachment` - Image or file attachment
- `UserMessage` - User message content (output as `UserMessageEvent`)
- `SystemPrompt` - System behavior configuration (output as `SystemPromptEvent`)

**Output Events**:
- `TextDelta` - Streaming chunk (ephemeral)
- `AssistantMessage` - Complete response (persisted)

**Internal Events** (persisted):
- `SetLlmConfig` - LLM configuration for context
- `LLMRequestInterrupted` - Partial response on cancellation
- `TextDeltaEvent` - Streaming chunk (ephemeral)
- `AssistantMessageEvent` - Complete response (persisted)
- `AgentTurnStartedEvent` - LLM turn started
- `AgentTurnCompletedEvent` - LLM turn completed
- `AgentTurnInterruptedEvent` - Turn interrupted (partial response)

**Lifecycle Events** (persisted):
- `SessionStartedEvent` - Agent session started
- `SessionEndedEvent` - Agent session ended
- `SetLlmConfigEvent` - LLM configuration for context

## Script Mode

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ export class AgentRegistry extends Effect.Service<AgentRegistry>()("@mini-agent/
Effect.map((map) => Array.from(map.keys()))
)

// List all contexts from EventStore (includes persisted contexts not yet loaded)
const listContexts: Effect.Effect<ReadonlyArray<AgentName>> = Effect.gen(function*() {
const contextNames = yield* store.list()
// Convert context names back to agent names (remove "-v1" suffix)
return contextNames.map((cn) => cn.replace(/-v1$/, "") as AgentName)
})

const shutdownAgent = (agentName: AgentName): Effect.Effect<void, AgentNotFoundError> =>
Effect.gen(function*() {
const current = yield* Ref.get(agents)
Expand Down Expand Up @@ -190,6 +197,7 @@ export class AgentRegistry extends Effect.Service<AgentRegistry>()("@mini-agent/
getOrCreate,
get,
list,
listContexts,
shutdownAgent,
shutdownAll
}
Expand Down
176 changes: 114 additions & 62 deletions src/cli/chat-ui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
* Interactive chat with interruptible LLM streaming.
* Return during streaming interrupts (with optional new message); Escape exits.
*/
import type { AiError, LanguageModel } from "@effect/ai"
import type { Error as PlatformError, FileSystem } from "@effect/platform"
import { Cause, Context, Effect, Fiber, Layer, Mailbox, Stream } from "effect"
import type { Error as PlatformError } from "@effect/platform"
import { Cause, Context, Effect, Fiber, Layer, Mailbox, Option, Stream } from "effect"
import { is } from "effect/Schema"
import { AgentRegistry } from "../agent-registry.ts"
import {
type AgentName,
type AgentTurnNumber,
AssistantMessageEvent,
type ContextEvent,
LLMRequestInterruptedEvent,
TextDeltaEvent,
UserMessageEvent
} from "../context.model.ts"
import { ContextService } from "../context.service.ts"
import type { ContextLoadError, ContextSaveError } from "../errors.ts"
import type { CurrentLlmConfig } from "../llm-config.ts"
import { streamLLMResponse } from "../llm.ts"
import { type ChatController, runOpenTUIChat } from "./components/opentui-chat.tsx"
type ContextName,
type ContextSaveError,
DEFAULT_SYSTEM_PROMPT,
EventBuilder,
type ReducerError,
TextDeltaEvent
} from "../domain.ts"
import { type ChatController, type DisplayEvent, runOpenTUIChat } from "./components/opentui-chat.tsx"

type ChatSignal =
| { readonly _tag: "Input"; readonly text: string }
Expand All @@ -32,23 +33,59 @@ export class ChatUI extends Context.Tag("@app/ChatUI")<
contextName: string
) => Effect.Effect<
void,
AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError,
LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig
PlatformError.PlatformError | ReducerError | ContextSaveError,
AgentRegistry
>
}
>() {
static readonly layer = Layer.effect(
ChatUI,
Effect.gen(function*() {
const contextService = yield* ContextService
const registry = yield* AgentRegistry

const runChat = Effect.fn("ChatUI.runChat")(function*(contextName: string) {
const existingEvents = yield* contextService.load(contextName)
const agentName = contextName as AgentName
const agent = yield* registry.getOrCreate(agentName)

// Check if context needs initialization
const ctx = yield* agent.getReducedContext
if (ctx.messages.length === 0) {
const systemEvent = EventBuilder.systemPrompt(
agentName,
agent.contextName,
ctx.nextEventNumber,
DEFAULT_SYSTEM_PROMPT
)
yield* agent.addEvent(systemEvent)
}

// Get events for display
const existingEvents = yield* agent.getEvents

// Convert domain events to display events for the UI
const displayEvents: Array<DisplayEvent> = existingEvents
.map((e): DisplayEvent | null => {
switch (e._tag) {
case "UserMessageEvent":
return { _tag: "UserMessage", content: e.content }
case "AssistantMessageEvent":
return { _tag: "AssistantMessage", content: e.content }
case "SystemPromptEvent":
return { _tag: "SystemPrompt", content: e.content }
case "AgentTurnInterruptedEvent":
return Option.isSome(e.partialResponse)
? { _tag: "LLMRequestInterrupted", partialResponse: e.partialResponse.value, reason: e.reason }
: null
default:
return null
}
})
.filter((e): e is DisplayEvent => e !== null)

const mailbox = yield* Mailbox.make<ChatSignal>()

const chat = yield* Effect.promise(() =>
runOpenTUIChat(contextName, existingEvents, {
runOpenTUIChat(contextName, displayEvents, {
onSubmit: (text) => {
mailbox.unsafeOffer({ _tag: "Input", text })
},
Expand All @@ -58,7 +95,7 @@ export class ChatUI extends Context.Tag("@app/ChatUI")<
})
)

yield* runChatLoop(contextName, contextService, chat, mailbox).pipe(
yield* runChatLoop(agentName, agent, chat, mailbox).pipe(
Effect.catchAll((error) => Effect.logError("Chat error", { error }).pipe(Effect.as(undefined))),
Effect.catchAllCause((cause) =>
Cause.isInterruptedOnly(cause) ? Effect.void : Effect.logError("Chat loop error", cause)
Expand All @@ -74,19 +111,22 @@ export class ChatUI extends Context.Tag("@app/ChatUI")<
static readonly testLayer = Layer.sync(ChatUI, () => ChatUI.of({ runChat: () => Effect.void }))
}

interface AgentInterface {
readonly addEvent: (event: ContextEvent) => Effect.Effect<void, ReducerError | ContextSaveError>
readonly events: Stream.Stream<ContextEvent, never>
readonly getReducedContext: Effect.Effect<{ nextEventNumber: number; currentTurnNumber: AgentTurnNumber }, never>
readonly contextName: ContextName
}

const runChatLoop = (
contextName: string,
contextService: Context.Tag.Service<typeof ContextService>,
agentName: AgentName,
agent: AgentInterface,
chat: ChatController,
mailbox: Mailbox.Mailbox<ChatSignal>
): Effect.Effect<
void,
AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError,
LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig
> =>
): Effect.Effect<void, ReducerError | ContextSaveError> =>
Effect.fn("ChatUI.runChatLoop")(function*() {
while (true) {
const result = yield* runChatTurn(contextName, contextService, chat, mailbox, null)
const result = yield* runChatTurn(agentName, agent, chat, mailbox, null)
if (result._tag === "exit") {
return
}
Expand All @@ -98,16 +138,12 @@ type TurnResult =
| { readonly _tag: "exit" }

const runChatTurn = (
contextName: string,
contextService: Context.Tag.Service<typeof ContextService>,
agentName: AgentName,
agent: AgentInterface,
chat: ChatController,
mailbox: Mailbox.Mailbox<ChatSignal>,
pendingMessage: string | null
): Effect.Effect<
TurnResult,
AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError,
LanguageModel.LanguageModel | FileSystem.FileSystem | CurrentLlmConfig
> =>
): Effect.Effect<TurnResult, ReducerError | ContextSaveError> =>
Effect.fn("ChatUI.runChatTurn")(function*() {
// Get message either from pending or by waiting for input
let userMessage: string
Expand All @@ -128,31 +164,35 @@ const runChatTurn = (
return { _tag: "continue" } as const
}

const userEvent = new UserMessageEvent({ content: userMessage })
const ctx = yield* agent.getReducedContext

yield* contextService.persistEvent(contextName, userEvent)
chat.addEvent(userEvent)
// Create and add user event
const userEvent = EventBuilder.userMessage(
agentName,
agent.contextName,
ctx.nextEventNumber,
userMessage
)
yield* agent.addEvent(userEvent)

// Show in UI
chat.addEvent({ _tag: "UserMessage", content: userMessage })

const events = yield* contextService.load(contextName)
let accumulatedText = ""

const streamFiber = yield* Effect.fork(
streamLLMResponse(events).pipe(
agent.events.pipe(
Stream.takeUntil((e) => e._tag === "AgentTurnCompletedEvent" || e._tag === "AgentTurnFailedEvent"),
Stream.tap((event: ContextEvent) =>
Effect.sync(() => {
if (is(TextDeltaEvent)(event)) {
accumulatedText += event.delta
chat.addEvent(event)
chat.addEvent({ _tag: "TextDelta", delta: event.delta })
} else if (is(AssistantMessageEvent)(event)) {
chat.addEvent({ _tag: "AssistantMessage", content: event.content })
}
})
),
Stream.filter(is(AssistantMessageEvent)),
Stream.tap((event) =>
Effect.gen(function*() {
yield* contextService.persistEvent(contextName, event)
chat.addEvent(event)
})
),
Stream.runDrain
)
)
Expand All @@ -165,30 +205,42 @@ const runChatTurn = (

if (result._tag === "exit") {
if (accumulatedText.length > 0) {
const interruptedEvent = new LLMRequestInterruptedEvent({
requestId: crypto.randomUUID(),
reason: "user_cancel",
partialResponse: accumulatedText
})
yield* contextService.persistEvent(contextName, interruptedEvent)
chat.addEvent(interruptedEvent)
const interruptedCtx = yield* agent.getReducedContext
const interruptedEvent = EventBuilder.agentTurnInterrupted(
agentName,
agent.contextName,
interruptedCtx.nextEventNumber,
interruptedCtx.currentTurnNumber,
"user_cancel",
accumulatedText
)
yield* agent.addEvent(interruptedEvent).pipe(Effect.catchAll(() => Effect.void))
chat.addEvent({ _tag: "LLMRequestInterrupted", partialResponse: accumulatedText, reason: "user_cancel" })
}
return { _tag: "exit" } as const
}

// result._tag === "interrupted" - user hit return during streaming
if (accumulatedText.length > 0) {
const interruptedEvent = new LLMRequestInterruptedEvent({
requestId: crypto.randomUUID(),
reason: result.newMessage ? "user_new_message" : "user_cancel",
partialResponse: accumulatedText
const interruptedCtx = yield* agent.getReducedContext
const interruptedEvent = EventBuilder.agentTurnInterrupted(
agentName,
agent.contextName,
interruptedCtx.nextEventNumber,
interruptedCtx.currentTurnNumber,
result.newMessage ? "user_new_message" : "user_cancel",
accumulatedText
)
yield* agent.addEvent(interruptedEvent).pipe(Effect.catchAll(() => Effect.void))
chat.addEvent({
_tag: "LLMRequestInterrupted",
partialResponse: accumulatedText,
reason: result.newMessage ? "user_new_message" : "user_cancel"
})
yield* contextService.persistEvent(contextName, interruptedEvent)
chat.addEvent(interruptedEvent)
}

if (result.newMessage) {
return yield* runChatTurn(contextName, contextService, chat, mailbox, result.newMessage)
return yield* runChatTurn(agentName, agent, chat, mailbox, result.newMessage)
}

return { _tag: "continue" } as const
Expand All @@ -200,9 +252,9 @@ type StreamResult =
| { readonly _tag: "interrupted"; readonly newMessage: string | null }

const awaitStreamCompletion = (
fiber: Fiber.RuntimeFiber<void, AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError>,
fiber: Fiber.RuntimeFiber<void, ReducerError | ContextSaveError>,
mailbox: Mailbox.Mailbox<ChatSignal>
): Effect.Effect<StreamResult, AiError.AiError | PlatformError.PlatformError | ContextLoadError | ContextSaveError> =>
): Effect.Effect<StreamResult, ReducerError | ContextSaveError> =>
Effect.fn("ChatUI.awaitStreamCompletion")(function*() {
const waitForFiber = Fiber.join(fiber).pipe(Effect.as({ _tag: "completed" } as StreamResult))
const waitForInterrupt = Effect.gen(function*() {
Expand Down
Loading