Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/actor-implementation-sketch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ const makeMiniAgent = (agentName: AgentName) =>
* For tests: subscribe before adding events, or use Deferred for coordination.
* ```typescript
* const ready = yield* Deferred.make<void>()
* const fiber = yield* agent.events.pipe(
* const stream = yield* agent.tapEventStream
* const fiber = yield* stream.pipe(
* Stream.tap(() => Deferred.succeed(ready, void 0)),
* Stream.runCollect, Effect.fork
* )
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Reference: **[design.ts](./design.ts)** for complete service interfaces and type
│ addEvent(e) → persist → add to events → reduce → Mailbox.offer │
│ │ │
│ BROADCAST: ▼ │
│ agent.events ◀── Stream.broadcastDynamic ◀──────────┘ │
│ agent.tapEventStream ◀── Stream.broadcastDynamic ◀──┘ │
│ │ │
│ PROCESSING (when event.triggersAgentTurn=true): │
│ └──▶ debounce(100ms) ──▶ MiniAgentTurn.execute(ctx) │
Expand Down
7 changes: 4 additions & 3 deletions docs/design.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,13 @@ export interface MiniAgent {
readonly addEvent: (event: ContextEvent) => Effect.Effect<void, MiniAgentError>

/** LIVE event stream - each call creates new subscriber. Late subscribers miss history. */
readonly events: Stream.Stream<ContextEvent, never>
readonly tapEventStream: Effect.Effect<Stream.Stream<ContextEvent, never>, never, Scope.Scope>

/** Get all events from in-memory state (for historical events) */
readonly getEvents: Effect.Effect<ReadonlyArray<ContextEvent>>

/** Get current derived state */
readonly getReducedContext: Effect.Effect<ReducedContext>
readonly getState: Effect.Effect<ReducedContext>

/** Gracefully shutdown: complete in-flight work, emit SessionEndedEvent, close streams */
readonly shutdown: Effect.Effect<void>
Expand Down Expand Up @@ -461,7 +461,8 @@ export const sampleProgram = Effect.gen(function*() {
const agent = yield* registry.getOrCreate(agentName)

// Subscribe to event stream
const streamFiber = yield* agent.events.pipe(
const eventStream = yield* agent.tapEventStream
const streamFiber = yield* eventStream.pipe(
Stream.tap((event) => Effect.log(`Event: ${event._tag}`)),
Stream.runDrain,
Effect.fork
Expand Down
1 change: 1 addition & 0 deletions src/agent-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export {

// Services
export { AgentRegistry } from "./agent-registry.ts"
export { AgentService } from "./agent-service.ts"
export { EventReducer } from "./event-reducer.ts"
export { EventStore } from "./event-store.ts"

Expand Down
136 changes: 136 additions & 0 deletions src/agent-service-http.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { Clock, Effect, Layer, Schema, Scope, Stream } from "effect"
import { AgentService, type AgentServiceApi } from "./agent-service.ts"
import { EventReducer } from "./event-reducer.ts"
import {
type AgentName,
type ContextEvent,
ContextEvent as ContextEventSchema
} from "./domain.ts"

const encodeEvent = Schema.encodeSync(ContextEventSchema)
const decodeEvent = Schema.decodeUnknown(ContextEventSchema)

const jsonHeaders = { "Content-Type": "application/json" }

const sanitizeBaseUrl = (url: string) => url.endsWith("/") ? url.slice(0, -1) : url

const parseEventsResponse = (data: unknown): Effect.Effect<ReadonlyArray<ContextEvent>> =>
Effect.sync(() => {
if (typeof data !== "object" || data === null || !("events" in data)) {
throw new Error("Invalid events payload")
}
const eventsData = (data as { events: unknown }).events
if (!Array.isArray(eventsData)) {
throw new Error("Events field must be an array")
}
return eventsData
}).pipe(
Effect.flatMap((eventsData) =>
Effect.forEach(eventsData, (entry) => decodeEvent(entry), { concurrency: "unbounded" })
)
)

export const makeHttpAgentServiceLayer = (baseUrl: string) => {
const normalizedBase = sanitizeBaseUrl(baseUrl)

const fetchJson = (path: string, init?: RequestInit) =>
Effect.tryPromise({
try: async () => {
const response = await fetch(`${normalizedBase}${path}`, init)
if (!response.ok) {
const message = await response.text().catch(() => response.statusText)
throw new Error(`HTTP ${response.status}: ${message}`)
}
if (response.status === 204) {
return null
}
return response.json()
},
catch: (error) => (error instanceof Error ? error : new Error(String(error)))
})

const postJson = (path: string, body?: unknown) =>
fetchJson(path, {
method: "POST",
headers: jsonHeaders,
body: body === undefined ? undefined : JSON.stringify(body)
})

return Layer.effect(
AgentService,
Effect.gen(function*() {
const reducer = yield* EventReducer

const getEvents = ({ agentName }: { readonly agentName: AgentName }) =>
fetchJson(`/agent/${agentName}/events/history`).pipe(
Effect.flatMap(parseEventsResponse)
)

const addEvents = ({ agentName, events }: { readonly agentName: AgentName; readonly events: ReadonlyArray<ContextEvent> }) =>
postJson(`/agent/${agentName}/events`, {
events: events.map((event) => encodeEvent(event)),
streamUntilIdle: false
}).pipe(Effect.asVoid)

const endSession = ({ agentName }: { readonly agentName: AgentName }) =>
postJson(`/agent/${agentName}/end-session`, undefined).pipe(Effect.asVoid)

const interruptTurn = ({ agentName }: { readonly agentName: AgentName }) =>
postJson(`/agent/${agentName}/interrupt`, undefined).pipe(Effect.asVoid)

const isIdle = ({ agentName }: { readonly agentName: AgentName }) =>
fetchJson(`/agent/${agentName}/idle`).pipe(
Effect.map((data) =>
typeof data === "object" && data !== null && "idle" in data ? Boolean((data as { idle: unknown }).idle) : false
)
)

const getState = ({ agentName }: { readonly agentName: AgentName }) =>
getEvents({ agentName }).pipe(
Effect.flatMap((events) => reducer.reduce(reducer.initialReducedContext, events))
)

const tapEventStream: AgentServiceApi["tapEventStream"] = ({ agentName }) =>
Effect.gen(function*() {
const existingEvents = yield* getEvents({ agentName })
let lastEventId = existingEvents.length > 0 ? existingEvents[existingEvents.length - 1]!.id : null

return Stream.asyncScoped<ContextEvent, never, never>((emit) =>
Effect.gen(function*() {
let cancelled = false
yield* Effect.addFinalizer(() => Effect.sync(() => { cancelled = true }))

while (!cancelled) {
yield* Effect.sleep("200 millis")
const events = yield* getEvents({ agentName })
if (events.length === 0) {
continue
}
if (!lastEventId) {
lastEventId = events[events.length - 1]!.id
continue
}
const lastIndex = events.findIndex((event) => event.id === lastEventId)
const startIndex = lastIndex === -1 ? events.length : lastIndex + 1
for (let idx = startIndex; idx < events.length; idx++) {
const event = events[idx]!
lastEventId = event.id
yield* emit.single(event)
}
}
})
)
})

return {
addEvents,
tapEventStream,
getEvents,
getState,
endSession,
interruptTurn,
isIdle
} satisfies AgentServiceApi
})
)
}
79 changes: 79 additions & 0 deletions src/agent-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Clock, Context, Effect, Layer, Scope, Stream } from "effect"
import { AgentRegistry } from "./agent-registry.ts"
import { type AgentName, type ContextEvent, type MiniAgent, type ReducedContext } from "./domain.ts"

export interface AddEventsInput {
readonly agentName: AgentName
readonly events: ReadonlyArray<ContextEvent>
}

export interface TapEventStreamInput {
readonly agentName: AgentName
}

export interface GetEventsInput {
readonly agentName: AgentName
}

export interface AgentServiceApi {
readonly addEvents: (input: AddEventsInput) => Effect.Effect<void>
readonly tapEventStream: (
input: TapEventStreamInput
) => Effect.Effect<Stream.Stream<ContextEvent, never>, never, Scope.Scope | Clock.Clock>
readonly getEvents: (input: GetEventsInput) => Effect.Effect<ReadonlyArray<ContextEvent>>
readonly getState: (input: { readonly agentName: AgentName }) => Effect.Effect<ReducedContext>
readonly endSession: (input: { readonly agentName: AgentName }) => Effect.Effect<void>
readonly interruptTurn: (input: { readonly agentName: AgentName }) => Effect.Effect<void>
readonly isIdle: (input: { readonly agentName: AgentName }) => Effect.Effect<boolean>
}

export const AgentService = Context.Tag<AgentServiceApi>()("@mini-agent/AgentService")

export const AgentServiceLive = Layer.effect(
AgentService,
Effect.gen(function*() {
const registry = yield* AgentRegistry

const withAgent = <A>(
agentName: AgentName,
useAgent: (agent: MiniAgent) => Effect.Effect<A>
): Effect.Effect<A> =>
Effect.gen(function*() {
const agent = yield* registry.getOrCreate(agentName).pipe(Effect.orDie)
return yield* useAgent(agent)
})

const addEvents = ({ agentName, events }: AddEventsInput) =>
withAgent(agentName, (agent) =>
Effect.forEach(events, (event) => agent.addEvent(event), { discard: true })
)

const tapEventStream: AgentServiceApi["tapEventStream"] = ({ agentName }) =>
withAgent(agentName, (agent) => agent.tapEventStream)

const getEvents = ({ agentName }: GetEventsInput) =>
withAgent(agentName, (agent) => agent.getEvents)

const getState = ({ agentName }: { readonly agentName: AgentName }) =>
withAgent(agentName, (agent) => agent.getState)

const endSession = ({ agentName }: { readonly agentName: AgentName }) =>
withAgent(agentName, (agent) => agent.endSession)

const interruptTurn = ({ agentName }: { readonly agentName: AgentName }) =>
withAgent(agentName, (agent) => agent.interruptTurn)

const isIdle = ({ agentName }: { readonly agentName: AgentName }) =>
withAgent(agentName, (agent) => agent.isIdle)

return {
addEvents,
tapEventStream,
getEvents,
getState,
endSession,
interruptTurn,
isIdle
} satisfies AgentServiceApi
})
)
Loading
Loading