diff --git a/src/browser/components/AIView.tsx b/src/browser/components/AIView.tsx index b950e02fa1..0a0cd20b38 100644 --- a/src/browser/components/AIView.tsx +++ b/src/browser/components/AIView.tsx @@ -62,7 +62,7 @@ import { ConcurrentLocalWarning } from "./ConcurrentLocalWarning"; import { BackgroundProcessesBanner } from "./BackgroundProcessesBanner"; import { useBackgroundBashHandlers } from "@/browser/hooks/useBackgroundBashHandlers"; import { checkAutoCompaction } from "@/browser/utils/compaction/autoCompactionCheck"; -import { executeCompaction } from "@/browser/utils/chatCommands"; + import { useProviderOptions } from "@/browser/hooks/useProviderOptions"; import { useAutoCompactionSettings } from "../hooks/useAutoCompactionSettings"; import { useSendMessageOptions } from "@/browser/hooks/useSendMessageOptions"; @@ -224,13 +224,21 @@ const AIViewInner: React.FC = ({ // We pass a default continueMessage of "Continue" as a resume sentinel so the backend can // auto-send it after compaction. The compaction prompt builder special-cases this sentinel // to avoid injecting it into the summarization request. + // Uses "force-compaction" source to distinguish from user-initiated /compact. const handleForceCompaction = useCallback(() => { if (!api) return; - void executeCompaction({ - api, + // Use compactHistory endpoint with interrupt to ensure immediate compaction + void api.workspace.compactHistory({ workspaceId, - sendMessageOptions: pendingSendOptions, + source: "force-compaction", + interrupt: "abort", continueMessage: { text: "Continue" }, + sendMessageOptions: { + model: pendingSendOptions.model, + thinkingLevel: pendingSendOptions.thinkingLevel, + providerOptions: pendingSendOptions.providerOptions, + experiments: pendingSendOptions.experiments, + }, }); }, [api, workspaceId, pendingSendOptions]); diff --git a/src/browser/hooks/useIdleCompactionHandler.test.ts b/src/browser/hooks/useIdleCompactionHandler.test.ts index 9e6aa4f554..05fc09355f 100644 --- a/src/browser/hooks/useIdleCompactionHandler.test.ts +++ b/src/browser/hooks/useIdleCompactionHandler.test.ts @@ -31,51 +31,27 @@ void mock.module("@/browser/stores/WorkspaceStore", () => ({ void mock.module("@/browser/hooks/useSendMessageOptions", () => ({ buildSendMessageOptions: () => ({ model: "test-model", - gateway: "anthropic", + thinkingLevel: undefined, + providerOptions: undefined, + experiments: undefined, }), })); -// Mock executeCompaction - tracks calls and can be configured per test -let executeCompactionCalls: Array<{ - api: unknown; - workspaceId: string; - sendMessageOptions: unknown; - source: string; -}> = []; -let executeCompactionResult: { success: true } | { success: false; error: string } = { +// Mock workspace.compactHistory - tracks calls and can be configured per test +let compactHistoryResolver: ((value: unknown) => void) | null = null; +let compactHistoryResult: + | { success: true; data: { operationId: string } } + | { success: false; error: unknown } = { success: true, + data: { operationId: "op-1" }, }; -let executeCompactionResolver: - | ((value: { success: true } | { success: false; error: string }) => void) - | null = null; - -void mock.module("@/browser/utils/chatCommands", () => ({ - executeCompaction: (opts: { - api: unknown; - workspaceId: string; - sendMessageOptions: unknown; - source: string; - }) => { - executeCompactionCalls.push(opts); - if (executeCompactionResolver) { - // Return a promise that hangs until manually resolved - return new Promise((resolve) => { - const savedResolver = executeCompactionResolver; - executeCompactionResolver = (val) => { - savedResolver?.(val); - resolve(val); - }; - }); - } - return Promise.resolve(executeCompactionResult); - }, -})); // Import after mocks are set up import { useIdleCompactionHandler } from "./useIdleCompactionHandler"; describe("useIdleCompactionHandler", () => { let mockApi: object; + let compactHistoryMock: ReturnType; let unsubscribeCalled: boolean; beforeEach(() => { @@ -83,16 +59,29 @@ describe("useIdleCompactionHandler", () => { globalThis.window = new GlobalWindow() as unknown as Window & typeof globalThis; globalThis.document = globalThis.window.document; - mockApi = { workspace: { sendMessage: mock() } }; + compactHistoryMock = mock((_args: unknown) => { + if (compactHistoryResolver) { + // Return a promise that hangs until manually resolved + return new Promise((resolve) => { + const savedResolver = compactHistoryResolver; + compactHistoryResolver = (val) => { + savedResolver?.(val); + resolve(val); + }; + }); + } + return Promise.resolve(compactHistoryResult); + }); + + mockApi = { workspace: { compactHistory: compactHistoryMock } }; unsubscribeCalled = false; mockUnsubscribe = () => { unsubscribeCalled = true; }; capturedCallback = null; onIdleCompactionNeededCallCount = 0; - executeCompactionCalls = []; - executeCompactionResult = { success: true }; - executeCompactionResolver = null; + compactHistoryResult = { success: true, data: { operationId: "op-1" } }; + compactHistoryResolver = null; }); afterEach(() => { @@ -122,7 +111,7 @@ describe("useIdleCompactionHandler", () => { expect(onIdleCompactionNeededCallCount).toBe(0); }); - test("calls executeCompaction when event received", async () => { + test("calls workspace.compactHistory when event received", async () => { renderHook(() => useIdleCompactionHandler({ api: mockApi as never })); expect(capturedCallback).not.toBeNull(); @@ -130,20 +119,25 @@ describe("useIdleCompactionHandler", () => { // Wait for async execution await Promise.resolve(); + await Promise.resolve(); // Extra tick for .then() - expect(executeCompactionCalls).toHaveLength(1); - expect(executeCompactionCalls[0]).toEqual({ - api: mockApi, + expect(compactHistoryMock.mock.calls).toHaveLength(1); + expect(compactHistoryMock.mock.calls[0][0]).toEqual({ workspaceId: "workspace-123", - sendMessageOptions: { model: "test-model", gateway: "anthropic" }, source: "idle-compaction", + sendMessageOptions: { + model: "test-model", + thinkingLevel: undefined, + providerOptions: undefined, + experiments: undefined, + }, }); }); test("prevents duplicate triggers for same workspace while in-flight", async () => { - // Make executeCompaction hang until we resolve it - this no-op will be replaced when promise is created + // Make compactHistory hang until we resolve it - this no-op will be replaced when promise is created // eslint-disable-next-line @typescript-eslint/no-empty-function - executeCompactionResolver = () => {}; + compactHistoryResolver = () => {}; renderHook(() => useIdleCompactionHandler({ api: mockApi as never })); @@ -156,11 +150,12 @@ describe("useIdleCompactionHandler", () => { await Promise.resolve(); // Should only have called once - expect(executeCompactionCalls).toHaveLength(1); + expect(compactHistoryMock.mock.calls).toHaveLength(1); // Resolve the first compaction - executeCompactionResolver({ success: true }); + compactHistoryResolver({ success: true, data: { operationId: "op-1" } }); await Promise.resolve(); + await Promise.resolve(); // Extra tick for .finally() }); test("allows different workspaces to compact simultaneously", async () => { @@ -170,7 +165,7 @@ describe("useIdleCompactionHandler", () => { capturedCallback!("workspace-2"); await Promise.resolve(); - expect(executeCompactionCalls).toHaveLength(2); + expect(compactHistoryMock.mock.calls).toHaveLength(2); }); test("clears workspace from triggered set after success", async () => { @@ -181,18 +176,19 @@ describe("useIdleCompactionHandler", () => { await Promise.resolve(); await Promise.resolve(); // Extra tick for .then() - expect(executeCompactionCalls).toHaveLength(1); + expect(compactHistoryMock.mock.calls).toHaveLength(1); + await Promise.resolve(); // Extra tick for .finally() // Should be able to trigger again after completion capturedCallback!("workspace-123"); await Promise.resolve(); - expect(executeCompactionCalls).toHaveLength(2); + expect(compactHistoryMock.mock.calls).toHaveLength(2); }); test("clears workspace from triggered set after failure", async () => { // Make first call fail - executeCompactionResult = { success: false, error: "test error" }; + compactHistoryResult = { success: false, error: "test error" }; // Suppress console.error for this test const originalError = console.error; @@ -205,13 +201,14 @@ describe("useIdleCompactionHandler", () => { await Promise.resolve(); await Promise.resolve(); // Extra tick for .then() - expect(executeCompactionCalls).toHaveLength(1); + expect(compactHistoryMock.mock.calls).toHaveLength(1); + await Promise.resolve(); // Extra tick for .finally() // Should be able to trigger again after failure capturedCallback!("workspace-123"); await Promise.resolve(); - expect(executeCompactionCalls).toHaveLength(2); + expect(compactHistoryMock.mock.calls).toHaveLength(2); console.error = originalError; }); diff --git a/src/browser/hooks/useIdleCompactionHandler.ts b/src/browser/hooks/useIdleCompactionHandler.ts index 1873b9ab7f..16c07eca6e 100644 --- a/src/browser/hooks/useIdleCompactionHandler.ts +++ b/src/browser/hooks/useIdleCompactionHandler.ts @@ -4,18 +4,13 @@ * The backend's IdleCompactionService detects when workspaces have been idle * for a configured period and emits `idle-compaction-needed` events to the stream. * - * This hook listens for these signals and triggers compaction via the frontend's - * executeCompaction(), which handles gateway, model preferences, etc. - * - * Status display is handled data-driven: the compaction request message includes - * displayStatus metadata, which the aggregator reads to set sidebar status. - * Status is cleared when the summary message with compacted: "idle" arrives. + * This hook listens for these signals and triggers compaction via the control-plane + * compactHistory endpoint, which ensures the compaction cannot be dropped or queued. */ import { useEffect, useRef } from "react"; import type { RouterClient } from "@orpc/server"; import type { AppRouter } from "@/node/orpc/router"; -import { executeCompaction } from "@/browser/utils/chatCommands"; import { buildSendMessageOptions } from "@/browser/hooks/useSendMessageOptions"; import { workspaceStore } from "@/browser/stores/WorkspaceStore"; @@ -47,22 +42,33 @@ export function useIdleCompactionHandler(params: IdleCompactionHandlerParams): v // Use buildSendMessageOptions to get correct model, gateway, thinking level, etc. const sendMessageOptions = buildSendMessageOptions(workspaceId); - // Status is handled data-driven via displayStatus in the message metadata - void executeCompaction({ - api, - workspaceId, - sendMessageOptions, - source: "idle-compaction", - }).then((result) => { - if (!result.success) { - console.error("Idle compaction failed:", result.error); - } - // Always clear from triggered set after completion (success or failure). - // This allows the workspace to be re-triggered on subsequent hourly checks - // if it becomes idle again. Backend eligibility checks (already_compacted, - // currently_streaming) provide authoritative deduplication. - triggeredWorkspacesRef.current.delete(workspaceId); - }); + // Use control-plane compactHistory endpoint for reliability + void api.workspace + .compactHistory({ + workspaceId, + source: "idle-compaction", + sendMessageOptions: { + model: sendMessageOptions.model, + thinkingLevel: sendMessageOptions.thinkingLevel, + providerOptions: sendMessageOptions.providerOptions, + experiments: sendMessageOptions.experiments, + }, + }) + .then((result) => { + if (!result.success) { + console.error("Idle compaction failed:", result.error); + } + }) + .catch((error) => { + console.error("Idle compaction error:", error); + }) + .finally(() => { + // Always clear from triggered set after completion (success or failure). + // This allows the workspace to be re-triggered on subsequent hourly checks + // if it becomes idle again. Backend eligibility checks (already_compacted, + // currently_streaming) provide authoritative deduplication. + triggeredWorkspacesRef.current.delete(workspaceId); + }); }; const unsubscribe = workspaceStore.onIdleCompactionNeeded(handleIdleCompactionNeeded); diff --git a/src/browser/utils/chatCommands.ts b/src/browser/utils/chatCommands.ts index f7264c9528..e6b3529820 100644 --- a/src/browser/utils/chatCommands.ts +++ b/src/browser/utils/chatCommands.ts @@ -690,20 +690,45 @@ export function prepareCompactionMessage(options: CompactionOptions): { } /** - * Execute a compaction command + * Execute a compaction command via the control-plane endpoint. + * This ensures compaction cannot be dropped or treated as a normal message. */ export async function executeCompaction( options: CompactionOptions & { api: RouterClient } ): Promise { - const { messageText, metadata, sendOptions } = prepareCompactionMessage(options); + // Resolve compaction model preference + const effectiveModel = resolveCompactionModel(options.model); + + // Map source to control-plane format + const source: "user" | "force-compaction" | "idle-compaction" = + options.source === "idle-compaction" ? "idle-compaction" : "user"; + + // Build continue message if provided + const continueMode = options.continueMessage?.mode ?? "exec"; + const continueMessage = options.continueMessage + ? { + text: options.continueMessage.text, + imageParts: options.continueMessage.imageParts, + model: options.continueMessage.model ?? options.sendMessageOptions.model, + mode: continueMode, + } + : undefined; - const result = await options.api.workspace.sendMessage({ + // Call the control-plane compactHistory endpoint + const result = await options.api.workspace.compactHistory({ workspaceId: options.workspaceId, - message: messageText, - options: { - ...sendOptions, - muxMetadata: metadata, - editMessageId: options.editMessageId, + model: effectiveModel, + maxOutputTokens: options.maxOutputTokens, + continueMessage, + source, + // For edits, interrupt any active stream before compacting. + // We preserve partial output; compaction will commit partial.json before summarizing. + interrupt: options.editMessageId ? "abort" : undefined, + sendMessageOptions: { + model: options.sendMessageOptions.model, + thinkingLevel: options.sendMessageOptions.thinkingLevel, + providerOptions: options.sendMessageOptions.providerOptions, + experiments: options.sendMessageOptions.experiments, }, }); diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 8228209a06..55b46f4c56 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -332,6 +332,45 @@ export const workspace = { }), output: ResultSchema(z.void(), z.string()), }, + /** + * Compact history: control-plane endpoint for compaction. + * Atomically interrupts any active stream (if requested) and starts compaction. + * Returns immediately after compaction starts; completion is signaled via chat-event. + */ + compactHistory: { + input: z.object({ + workspaceId: z.string(), + /** Model to use for compaction (defaults to workspace's current model) */ + model: z.string().optional(), + /** Max output tokens for compaction summary */ + maxOutputTokens: z.number().optional(), + /** Message to auto-send after compaction completes */ + continueMessage: z + .object({ + text: z.string(), + imageParts: z.array(ImagePartSchema).optional(), + model: z.string().optional(), + mode: z.enum(["exec", "plan"]).optional(), + }) + .optional(), + /** Source of the compaction request for telemetry/behavior */ + source: z.enum(["user", "force-compaction", "idle-compaction"]).optional(), + /** How to handle an active stream: none (fail if streaming), graceful (wait), abort (interrupt immediately) */ + interrupt: z.enum(["none", "graceful", "abort"]).optional(), + /** Send message options (model, thinking level, etc.) for compaction stream */ + sendMessageOptions: SendMessageOptionsSchema.omit({ + editMessageId: true, + muxMetadata: true, + }).optional(), + }), + output: ResultSchema( + z.object({ + /** Unique ID for this compaction operation (can be used to correlate events) */ + operationId: z.string(), + }), + SendMessageErrorSchema + ), + }, replaceChatHistory: { input: z.object({ workspaceId: z.string(), diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index fd69479797..06dd5888dd 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -755,6 +755,23 @@ export const router = (authToken?: string) => { } return { success: true, data: undefined }; }), + compactHistory: t + .input(schemas.workspace.compactHistory.input) + .output(schemas.workspace.compactHistory.output) + .handler(async ({ context, input }) => { + const result = await context.workspaceService.compactHistory(input.workspaceId, { + model: input.model, + maxOutputTokens: input.maxOutputTokens, + continueMessage: input.continueMessage, + source: input.source, + interrupt: input.interrupt, + sendMessageOptions: input.sendMessageOptions, + }); + if (!result.success) { + return { success: false, error: result.error }; + } + return { success: true, data: result.data }; + }), replaceChatHistory: t .input(schemas.workspace.replaceChatHistory.input) .output(schemas.workspace.replaceChatHistory.output) diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 0d5afcddec..e9e1c36c86 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -40,6 +40,11 @@ import { TURNS_BETWEEN_ATTACHMENTS } from "@/common/constants/attachments"; import { extractEditedFileDiffs } from "@/common/utils/messages/extractEditedFiles"; import { isValidModelFormat } from "@/common/utils/ai/models"; import { modeToToolPolicy } from "@/common/utils/ui/modeUtils"; +import { + buildCompactionPrompt, + DEFAULT_COMPACTION_WORD_TARGET, + WORDS_TO_TOKENS_RATIO, +} from "@/common/constants/ui"; /** * Tracked file state for detecting external edits. @@ -142,6 +147,23 @@ export class AgentSession { */ private postCompactionContextEnabled = false; + /** + * Active compaction operation (control-plane initiated). + * Used by CompactionHandler to detect compaction completion instead of scanning history. + */ + private activeCompactionOperation: { + operationId: string; + /** Stream messageId for the compaction summary stream (set from stream-start) */ + streamMessageId: string | null; + source: "user" | "force-compaction" | "idle-compaction"; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + } | null = null; + constructor(options: AgentSessionOptions) { assert(options, "AgentSession requires options"); const { @@ -178,6 +200,10 @@ export class AgentSession { partialService: this.partialService, emitter: this.emitter, onCompactionComplete, + getActiveCompactionOperation: () => this.activeCompactionOperation, + clearActiveCompactionOperation: () => { + this.activeCompactionOperation = null; + }, }); this.attachAiListeners(); @@ -576,6 +602,192 @@ export class AgentSession { return this.streamWithHistory(model, options); } + /** + * Start a compaction operation (control-plane initiated). + * Unlike sendMessage with muxMetadata, this does not persist a user message. + * Compaction completion is detected via session state, not history scanning. + */ + async startCompaction(options: { + operationId: string; + model?: string; + maxOutputTokens?: number; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + source: "user" | "force-compaction" | "idle-compaction"; + sendMessageOptions?: Omit; + }): Promise> { + this.assertNotDisposed("startCompaction"); + + // Validate model if provided + const model = options.model ?? options.sendMessageOptions?.model; + if (!model || model.trim().length === 0) { + return Err( + createUnknownSendMessageError("No model specified for compaction. Please select a model.") + ); + } + if (!isValidModelFormat(model)) { + return Err({ + type: "invalid_model_string", + message: `Invalid model string format: "${model}". Expected "provider:model-id"`, + }); + } + + // Set active compaction operation (used by CompactionHandler) + this.activeCompactionOperation = { + operationId: options.operationId, + streamMessageId: null, + source: options.source, + continueMessage: options.continueMessage, + }; + + // Clean up background processes (they won't be in the summary) + await this.backgroundProcessManager.cleanup(this.workspaceId); + + if (this.disposed) { + this.activeCompactionOperation = null; + return Ok(undefined); + } + + // Queue continue message if provided + if (options.continueMessage) { + const { finalText, metadata } = prepareUserMessageForSend(options.continueMessage); + const continueMode = options.continueMessage.mode ?? "exec"; + const sanitizedOptions: Omit< + SendMessageOptions, + "muxMetadata" | "mode" | "editMessageId" | "imageParts" | "maxOutputTokens" + > & { + imageParts?: typeof options.continueMessage.imageParts; + muxMetadata?: typeof metadata; + } = { + model: options.continueMessage.model ?? model, + thinkingLevel: options.sendMessageOptions?.thinkingLevel, + toolPolicy: modeToToolPolicy(continueMode), + additionalSystemInstructions: options.sendMessageOptions?.additionalSystemInstructions, + providerOptions: options.sendMessageOptions?.providerOptions, + experiments: options.sendMessageOptions?.experiments, + }; + + if (options.continueMessage.imageParts?.length) { + sanitizedOptions.imageParts = options.continueMessage.imageParts; + } + if (metadata) { + sanitizedOptions.muxMetadata = metadata; + } + + this.messageQueue.add(finalText, sanitizedOptions); + this.emitQueuedMessageChanged(); + } + + // Build compaction prompt + const targetWords = options.maxOutputTokens + ? Math.round(options.maxOutputTokens / WORDS_TO_TOKENS_RATIO) + : DEFAULT_COMPACTION_WORD_TARGET; + let compactionPrompt = buildCompactionPrompt(targetWords); + + // If there's a non-default continue message, add context + if (options.continueMessage && options.continueMessage.text.trim() !== "Continue") { + compactionPrompt += `\n\nThe user wants to continue with: ${options.continueMessage.text}`; + } + + // Commit any pending partial + const commitResult = await this.partialService.commitToHistory(this.workspaceId); + if (!commitResult.success) { + this.activeCompactionOperation = null; + return Err(createUnknownSendMessageError(commitResult.error)); + } + + // Get history for compaction + const historyResult = await this.historyService.getHistory(this.workspaceId); + if (!historyResult.success) { + this.activeCompactionOperation = null; + return Err(createUnknownSendMessageError(historyResult.error)); + } + + if (historyResult.data.length === 0) { + this.activeCompactionOperation = null; + return Err(createUnknownSendMessageError("Cannot compact: workspace history is empty.")); + } + + // Build compaction messages (history + compaction prompt as user message) + // Tool policy for compaction: disable all tools + const disableAllTools = [{ regex_match: ".*", action: "disable" as const }]; + + const compactionUserMessage = createMuxMessage( + `compact-prompt-${options.operationId}`, + "user", + compactionPrompt, + { + timestamp: Date.now(), + toolPolicy: disableAllTools, + } + ); + + const compactionMessages = [...historyResult.data, compactionUserMessage]; + + // Enforce thinking policy + const effectiveThinkingLevel = options.sendMessageOptions?.thinkingLevel + ? enforceThinkingPolicy(model, options.sendMessageOptions.thinkingLevel) + : undefined; + + // Stream compaction (no history persistence for the prompt message) + const abortSignal = undefined; + const additionalSystemInstructions = undefined; + const maxOutputTokens = options.maxOutputTokens; + const muxProviderOptions = options.sendMessageOptions?.providerOptions; + const mode = undefined; + const recordFileState = undefined; + const changedFileAttachments = undefined; + const postCompactionAttachments = undefined; + const experiments = options.sendMessageOptions?.experiments; + + return this.aiService.streamMessage( + compactionMessages, + this.workspaceId, + model, + effectiveThinkingLevel, + disableAllTools, + abortSignal, + additionalSystemInstructions, + maxOutputTokens, + muxProviderOptions, + mode, + recordFileState, + changedFileAttachments, + postCompactionAttachments, + experiments + ); + } + + /** + * Get the active compaction operation, if any. + * Used by CompactionHandler to detect compaction completion. + */ + getActiveCompactionOperation(): { + operationId: string; + streamMessageId: string | null; + source: "user" | "force-compaction" | "idle-compaction"; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + } | null { + return this.activeCompactionOperation; + } + + /** + * Clear the active compaction operation. + * Called by CompactionHandler after compaction completes. + */ + clearActiveCompactionOperation(): void { + this.activeCompactionOperation = null; + } + async interruptStream(options?: { soft?: boolean; abandonPartial?: boolean; @@ -689,7 +901,16 @@ export class AgentSession { this.aiService.on(event, wrapped as never); }; - forward("stream-start", (payload) => this.emitChatEvent(payload)); + forward("stream-start", (payload) => { + // Bind the active compaction operation to the specific stream messageId. + // This prevents unrelated streams from being misinterpreted as compaction. + const activeOperation = this.activeCompactionOperation; + if (activeOperation?.streamMessageId === null && payload.type === "stream-start") { + activeOperation.streamMessageId = payload.messageId; + } + + this.emitChatEvent(payload); + }); forward("stream-delta", (payload) => this.emitChatEvent(payload)); forward("tool-call-start", (payload) => this.emitChatEvent(payload)); forward("bash-output", (payload) => this.emitChatEvent(payload)); @@ -714,7 +935,18 @@ export class AgentSession { forward("reasoning-delta", (payload) => this.emitChatEvent(payload)); forward("reasoning-end", (payload) => this.emitChatEvent(payload)); forward("usage-delta", (payload) => this.emitChatEvent(payload)); - forward("stream-abort", (payload) => this.emitChatEvent(payload)); + forward("stream-abort", (payload) => { + // If the compaction stream was aborted, clear the active compaction operation so + // subsequent stream-end events cannot replace history. + if ( + this.activeCompactionOperation && + payload.type === "stream-abort" && + this.activeCompactionOperation.streamMessageId === payload.messageId + ) { + this.activeCompactionOperation = null; + } + this.emitChatEvent(payload); + }); forward("stream-end", async (payload) => { const handled = await this.compactionHandler.handleCompletion(payload as StreamEndEvent); @@ -745,6 +977,15 @@ export class AgentSession { error: string; errorType?: string; }; + // If the compaction stream errored, clear the active compaction operation so + // subsequent stream-end events cannot replace history. + if ( + this.activeCompactionOperation && + this.activeCompactionOperation.streamMessageId === data.messageId + ) { + this.activeCompactionOperation = null; + } + const streamError: StreamErrorMessage = { type: "stream-error", messageId: data.messageId, @@ -858,6 +1099,16 @@ export class AgentSession { return; } + // If a compaction operation is active, do NOT auto-send queued messages. + // + // Why: if an earlier stream (the one we interrupted to start compaction) emits a late + // stream-end/tool-call-end event, it can trigger sendQueuedMessages() while compaction is + // still streaming. That can cause the queued continue message to be sent too early and + // then wiped by the compaction history replacement. + if (this.activeCompactionOperation) { + return; + } + // Clear the queued message flag (even if queue is empty, to handle race conditions) this.backgroundProcessManager.setMessageQueued(this.workspaceId, false); diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index 5a49aeb8e3..30f2b93f4b 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeEach, mock } from "bun:test"; -import { CompactionHandler } from "./compactionHandler"; +import { CompactionHandler, type ActiveCompactionOperation } from "./compactionHandler"; import type { HistoryService } from "./historyService"; import type { PartialService } from "./partialService"; import type { EventEmitter } from "events"; @@ -22,30 +22,26 @@ interface ChatEventData { const createMockHistoryService = () => { let getHistoryResult: Result = Ok([]); - let clearHistoryResult: Result = Ok([]); - let appendToHistoryResult: Result = Ok(undefined); + let replaceHistoryResult: Result = Ok([]); const getHistory = mock((_) => Promise.resolve(getHistoryResult)); - const clearHistory = mock((_) => Promise.resolve(clearHistoryResult)); - const appendToHistory = mock((_, __) => Promise.resolve(appendToHistoryResult)); + const replaceHistory = mock((_, __) => Promise.resolve(replaceHistoryResult)); + + // Unused in compaction tests, but kept for interface compatibility const updateHistory = mock(() => Promise.resolve(Ok(undefined))); const truncateAfterMessage = mock(() => Promise.resolve(Ok(undefined))); return { getHistory, - clearHistory, - appendToHistory, + replaceHistory, updateHistory, truncateAfterMessage, // Allow setting mock return values mockGetHistory: (result: Result) => { getHistoryResult = result; }, - mockClearHistory: (result: Result) => { - clearHistoryResult = result; - }, - mockAppendToHistory: (result: Result) => { - appendToHistoryResult = result; + mockReplaceHistory: (result: Result) => { + replaceHistoryResult = result; }, }; }; @@ -81,12 +77,21 @@ const createMockEmitter = (): { emitter: EventEmitter; events: EmittedEvent[] } return { emitter: emitter as EventEmitter, events }; }; -const createCompactionRequest = (id = "req-1"): MuxMessage => - createMuxMessage(id, "user", "Please summarize the conversation", { - historySequence: 0, - muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} }, +/** Helper: create a normal user message (not compaction) */ +const createNormalUserMessage = (id = "msg-1", historySequence = 0): MuxMessage => + createMuxMessage(id, "user", "Hello, how are you?", { + historySequence, + muxMetadata: { type: "normal" }, }); +/** Helper: create a valid long summary (>=50 words) */ +const createValidSummary = (): string => + "This is a comprehensive summary of the conversation. The user wanted to build a feature for their application. " + + "We discussed the requirements and architecture. Key decisions included using TypeScript for type safety, " + + "implementing a control-plane pattern for reliability, and adding validation for data integrity. " + + "The implementation is now complete with proper error handling and tests. " + + "Next steps involve deployment and monitoring of the new feature in production."; + const createStreamEndEvent = ( summary: string, metadata?: Record @@ -106,12 +111,11 @@ const createStreamEndEvent = ( // DRY helper to set up successful compaction scenario const setupSuccessfulCompaction = ( mockHistoryService: ReturnType, - messages: MuxMessage[] = [createCompactionRequest()], - clearedSequences?: number[] + messages: MuxMessage[] = [createNormalUserMessage()], + deletedSequences?: number[] ) => { mockHistoryService.mockGetHistory(Ok(messages)); - mockHistoryService.mockClearHistory(Ok(clearedSequences ?? messages.map((_, i) => i))); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + mockHistoryService.mockReplaceHistory(Ok(deletedSequences ?? messages.map((_, i) => i))); }; describe("CompactionHandler", () => { @@ -122,6 +126,8 @@ describe("CompactionHandler", () => { let telemetryCapture: ReturnType; let telemetryService: TelemetryService; let emittedEvents: EmittedEvent[]; + let activeOperation: ActiveCompactionOperation | null; + let clearActiveOperationCalled: boolean; const workspaceId = "test-workspace"; beforeEach(() => { @@ -137,44 +143,73 @@ describe("CompactionHandler", () => { mockHistoryService = createMockHistoryService(); mockPartialService = createMockPartialService(); + // Default: no active compaction operation + activeOperation = null; + clearActiveOperationCalled = false; + handler = new CompactionHandler({ workspaceId, historyService: mockHistoryService as unknown as HistoryService, telemetryService, partialService: mockPartialService as unknown as PartialService, emitter: mockEmitter, + getActiveCompactionOperation: () => activeOperation, + clearActiveCompactionOperation: () => { + clearActiveOperationCalled = true; + activeOperation = null; + }, }); }); - describe("handleCompletion() - Normal Compaction Flow", () => { - it("should return false when no compaction request found", async () => { - const normalMsg = createMuxMessage("msg1", "user", "Hello", { - historySequence: 0, - muxMetadata: { type: "normal" }, - }); - mockHistoryService.mockGetHistory(Ok([normalMsg])); + /** Helper to set an active compaction operation */ + + const getReplaceHistoryCalls = (): Array<[string, MuxMessage[]]> => { + return mockHistoryService.replaceHistory.mock.calls as unknown as Array<[string, MuxMessage[]]>; + }; + + const getReplacedMessage = (): MuxMessage => { + const calls = getReplaceHistoryCalls(); + const [, messages] = calls[0]; + return messages[0]; + }; + const setActiveOperation = ( + operationId: string, + source: "user" | "force-compaction" | "idle-compaction" = "user", + streamMessageId: string | null = "msg-id" + ) => { + activeOperation = { operationId, streamMessageId, source }; + }; - const event = createStreamEndEvent("Summary"); + describe("handleCompletion() - Control-plane Compaction", () => { + it("should ignore stream-end events that do not match the compaction stream messageId", async () => { + // Active operation, but bound to a different stream + setActiveOperation("op-1", "user", "different-message-id"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); + + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); + // Not treated as compaction expect(result).toBe(false); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(0); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); }); + it("should return false when no active compaction operation", async () => { + // No active operation set + const msg = createNormalUserMessage(); + mockHistoryService.mockGetHistory(Ok([msg])); - it("should return false when historyService fails", async () => { - mockHistoryService.mockGetHistory(Err("Database error")); - - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); expect(result).toBe(false); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); }); it("should capture compaction_completed telemetry on successful compaction", async () => { - const compactionReq = createCompactionRequest(); - setupSuccessfulCompaction(mockHistoryService, [compactionReq]); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary", { + const event = createStreamEndEvent(createValidSummary(), { duration: 1500, // Prefer contextUsage (context size) over total usage. contextUsage: { inputTokens: 1000, outputTokens: 333, totalTokens: undefined }, @@ -202,102 +237,69 @@ describe("CompactionHandler", () => { }); it("should return true when successful", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Complete summary"); + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); expect(result).toBe(true); + expect(clearActiveOperationCalled).toBe(true); }); - it("should join multiple text parts from event.parts", async () => { - const compactionReq = createCompactionRequest(); - setupSuccessfulCompaction(mockHistoryService, [compactionReq]); - - // Create event with multiple text parts - const event: StreamEndEvent = { - type: "stream-end", - workspaceId: "test-workspace", - messageId: "msg-id", - parts: [ - { type: "text", text: "Part 1 " }, - { type: "text", text: "Part 2 " }, - { type: "text", text: "Part 3" }, - ], - metadata: { - model: "claude-3-5-sonnet-20241022", - usage: { inputTokens: 100, outputTokens: 50, totalTokens: undefined }, - duration: 1500, - }, - }; - await handler.handleCompletion(event); - - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( - "Part 1 Part 2 Part 3" - ); - }); - - it("should extract summary text from event.parts", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should extract and store summary text from event.parts", async () => { + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("This is the summary"); + const validSummary = createValidSummary(); + const event = createStreamEndEvent(validSummary); await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( - "This is the summary" - ); + const replacedMsg = getReplacedMessage(); + expect((replacedMsg.parts[0] as { type: "text"; text: string }).text).toBe(validSummary); }); it("should delete partial.json before clearing history (race condition fix)", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); // deletePartial should be called once before clearHistory expect(mockPartialService.deletePartial.mock.calls).toHaveLength(1); expect(mockPartialService.deletePartial.mock.calls[0][0]).toBe(workspaceId); - - // Verify deletePartial was called (we can't easily verify order without more complex mocking, - // but the important thing is that it IS called during compaction) }); - it("should call clearHistory() and appendToHistory()", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should call replaceHistory()", async () => { + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const validSummary = createValidSummary(); + const event = createStreamEndEvent(validSummary); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.clearHistory.mock.calls[0][0]).toBe(workspaceId); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.appendToHistory.mock.calls[0][0]).toBe(workspaceId); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect(appendedMsg.role).toBe("assistant"); - expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe("Summary"); + const calls = getReplaceHistoryCalls(); + expect(calls).toHaveLength(1); + expect(calls[0][0]).toBe(workspaceId); + const replacedMsg = getReplacedMessage(); + expect(replacedMsg.role).toBe("assistant"); + expect((replacedMsg.parts[0] as { type: "text"; text: string }).text).toBe(validSummary); }); it("should emit delete event for old messages", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1, 2, 3])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory( + Ok([ + createNormalUserMessage("msg-0", 0), + createNormalUserMessage("msg-1", 1), + createNormalUserMessage("msg-2", 2), + createNormalUserMessage("msg-3", 3), + ]) + ); + mockHistoryService.mockReplaceHistory(Ok([])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const deleteEvent = emittedEvents.find( @@ -309,13 +311,11 @@ describe("CompactionHandler", () => { }); it("should emit summary message with complete metadata", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); const usage = { inputTokens: 200, outputTokens: 100, totalTokens: 300 }; - const event = createStreamEndEvent("Summary", { + const event = createStreamEndEvent(createValidSummary(), { model: "claude-3-5-sonnet-20241022", usage, duration: 2000, @@ -342,12 +342,10 @@ describe("CompactionHandler", () => { }); it("should emit stream-end event to frontend", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary", { duration: 1234 }); + const event = createStreamEndEvent(createValidSummary(), { duration: 1234 }); await handler.handleCompletion(event); const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); @@ -358,57 +356,94 @@ describe("CompactionHandler", () => { }); it("should set compacted in summary metadata", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect(appendedMsg.metadata?.compacted).toBe("user"); + const replacedMsg = getReplacedMessage(); + expect(replacedMsg.metadata?.compacted).toBe("user"); + }); + }); + + describe("handleCompletion() - Summary Validation", () => { + it("should reject empty summary and not clear history", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + + const event = createStreamEndEvent(""); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); // Still returns true (was a compaction attempt) + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); + expect(clearActiveOperationCalled).toBe(true); + }); + + it("should reject summary that is too short (<10 words)", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + + // Only 9 words + const event = createStreamEndEvent("one two three four five six seven eight nine"); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); + expect(clearActiveOperationCalled).toBe(true); + }); + + it("should accept summary at minimum word count", async () => { + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); + + // Exactly 50 words + const fiftyWords = Array(50).fill("word").join(" "); + const event = createStreamEndEvent(fiftyWords); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); }); describe("handleCompletion() - Deduplication", () => { - it("should track processed compaction-request IDs", async () => { - const compactionReq = createCompactionRequest("req-unique"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should track processed operation IDs", async () => { + setActiveOperation("op-unique"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); - it("should return true without re-processing when same request ID seen twice", async () => { - const compactionReq = createCompactionRequest("req-dupe"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should return true without re-processing when same operation ID seen twice", async () => { + setActiveOperation("op-dupe"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); const result1 = await handler.handleCompletion(event); + + // Re-set operation since it was cleared + setActiveOperation("op-dupe"); const result2 = await handler.handleCompletion(event); expect(result1).toBe(true); expect(result2).toBe(true); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); it("should not emit duplicate events", async () => { - const compactionReq = createCompactionRequest("req-dupe-2"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-dupe-2"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const eventCountAfterFirst = emittedEvents.length; + // Re-set operation since it was cleared + setActiveOperation("op-dupe-2"); await handler.handleCompletion(event); const eventCountAfterSecond = emittedEvents.length; @@ -416,77 +451,74 @@ describe("CompactionHandler", () => { }); it("should not clear history twice", async () => { - const compactionReq = createCompactionRequest("req-dupe-3"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-dupe-3"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); + + // Re-set operation since it was cleared + setActiveOperation("op-dupe-3"); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); }); describe("Error Handling", () => { - it("should return false when clearHistory() fails", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Clear failed")); + it("should return true but not replace history when replaceHistory() fails", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Err("Replace failed")); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); - expect(result).toBe(false); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(0); - }); - - it("should return false when appendToHistory() fails", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Err("Append failed")); - - const event = createStreamEndEvent("Summary"); - const result = await handler.handleCompletion(event); + // Returns true because it was a compaction attempt (even though it failed) + expect(result).toBe(true); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); + expect(clearActiveOperationCalled).toBe(true); - expect(result).toBe(false); + // Should not emit summary/delete events on failure + const summaryEvent = emittedEvents.find((_e) => { + const m = _e.data.message as MuxMessage | undefined; + return m?.role === "assistant" && m?.parts !== undefined; + }); + expect(summaryEvent).toBeUndefined(); }); it("should log errors but not throw", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Database corruption")); + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Err("Database corruption")); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); // Should not throw const result = await handler.handleCompletion(event); - expect(result).toBe(false); + expect(result).toBe(true); }); - it("should not emit events when compaction fails mid-process", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Clear failed")); + it("should emit stream-end even when compaction fails", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Err("Replace failed")); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); - expect(emittedEvents).toHaveLength(0); + // stream-end should be emitted so UI updates + const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); + expect(streamEndEvent).toBeDefined(); }); }); describe("Event Emission", () => { it("should include workspaceId in all chat-event emissions", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const chatEvents = emittedEvents.filter((e) => e.event === "chat-event"); @@ -497,12 +529,17 @@ describe("CompactionHandler", () => { }); it("should emit DeleteMessage with correct type and historySequences array", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([5, 10, 15])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory( + Ok([ + createNormalUserMessage("msg-5", 5), + createNormalUserMessage("msg-10", 10), + createNormalUserMessage("msg-15", 15), + ]) + ); + mockHistoryService.mockReplaceHistory(Ok([])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const deleteEvent = emittedEvents.find( @@ -515,12 +552,11 @@ describe("CompactionHandler", () => { }); it("should emit summary message with proper MuxMessage structure", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary text"); + const validSummary = createValidSummary(); + const event = createStreamEndEvent(validSummary); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -532,7 +568,7 @@ describe("CompactionHandler", () => { expect(summaryMsg).toMatchObject({ id: expect.stringContaining("summary-") as string, role: "assistant", - parts: [{ type: "text", text: "Summary text" }], + parts: [{ type: "text", text: validSummary }], metadata: expect.objectContaining({ compacted: "user", muxMetadata: { type: "normal" }, @@ -541,12 +577,10 @@ describe("CompactionHandler", () => { }); it("should forward stream events (stream-end, stream-abort) correctly", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary", { customField: "test" }); + const event = createStreamEndEvent(createValidSummary(), { customField: "test" }); await handler.handleCompletion(event); const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); @@ -563,21 +597,12 @@ describe("CompactionHandler", () => { timestamp: originalTimestamp, historySequence: 0, }); - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - historySequence: 1, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - mockHistoryService.mockGetHistory(Ok([userMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-idle", "idle-compaction"); + mockHistoryService.mockGetHistory(Ok([userMessage])); + mockHistoryService.mockReplaceHistory(Ok([0])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -597,21 +622,12 @@ describe("CompactionHandler", () => { compacted: "user", historySequence: 0, }); - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - historySequence: 1, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - mockHistoryService.mockGetHistory(Ok([compactedMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-idle", "idle-compaction"); + mockHistoryService.mockGetHistory(Ok([compactedMessage])); + mockHistoryService.mockReplaceHistory(Ok([0])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -635,21 +651,12 @@ describe("CompactionHandler", () => { timestamp: newerUserTimestamp, historySequence: 1, }); - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - historySequence: 2, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - mockHistoryService.mockGetHistory(Ok([compactedMessage, userMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1, 2])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-idle", "idle-compaction"); + mockHistoryService.mockGetHistory(Ok([compactedMessage, userMessage])); + mockHistoryService.mockReplaceHistory(Ok([0, 1])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -662,57 +669,20 @@ describe("CompactionHandler", () => { expect(summaryMsg.metadata?.timestamp).toBe(newerUserTimestamp); }); - it("should skip compaction-request message when finding timestamp to preserve", async () => { - const originalTimestamp = Date.now() - 3600 * 1000; // 1 hour ago - the real user message - const freshTimestamp = Date.now(); // The compaction request has a fresh timestamp - const userMessage = createMuxMessage("user-1", "user", "Hello", { - timestamp: originalTimestamp, - historySequence: 0, - }); - // Idle compaction request WITH a timestamp (as happens in production) - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - timestamp: freshTimestamp, - historySequence: 1, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - - mockHistoryService.mockGetHistory(Ok([userMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); - - const event = createStreamEndEvent("Summary"); - await handler.handleCompletion(event); - - const summaryEvent = emittedEvents.find((_e) => { - const m = _e.data.message as MuxMessage | undefined; - return m?.role === "assistant" && m?.metadata?.compacted; - }); - expect(summaryEvent).toBeDefined(); - const summaryMsg = summaryEvent?.data.message as MuxMessage; - // Should use the OLD user message timestamp, NOT the fresh compaction request timestamp - expect(summaryMsg.metadata?.timestamp).toBe(originalTimestamp); - expect(summaryMsg.metadata?.compacted).toBe("idle"); - }); - - it("should use current time for non-idle compaction", async () => { + it("should use current time for non-idle compaction (user source)", async () => { const oldTimestamp = Date.now() - 3600 * 1000; // 1 hour ago const userMessage = createMuxMessage("user-1", "user", "Hello", { timestamp: oldTimestamp, historySequence: 0, }); - // Regular compaction (not idle) - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([userMessage, compactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + + // Regular compaction (not idle) - uses "user" source + setActiveOperation("op-user", "user"); + mockHistoryService.mockGetHistory(Ok([userMessage])); + mockHistoryService.mockReplaceHistory(Ok([0])); const beforeTime = Date.now(); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const afterTime = Date.now(); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 62cbeeae51..2d222ecfdf 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -3,7 +3,7 @@ import type { HistoryService } from "./historyService"; import type { PartialService } from "./partialService"; import type { StreamEndEvent } from "@/common/types/stream"; -import type { WorkspaceChatMessage, DeleteMessage } from "@/common/orpc/types"; +import type { WorkspaceChatMessage, DeleteMessage, ImagePart } from "@/common/orpc/types"; import type { Result } from "@/common/types/result"; import { Ok, Err } from "@/common/types/result"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; @@ -18,6 +18,23 @@ import { } from "@/common/utils/messages/extractEditedFiles"; import { computeRecencyFromMessages } from "@/common/utils/recency"; +/** Minimum word count for a valid compaction summary */ +const MIN_SUMMARY_WORDS = 10; + +/** Active compaction operation tracked via session state (control-plane) */ +export interface ActiveCompactionOperation { + operationId: string; + /** Stream messageId for the compaction summary stream (set from stream-start) */ + streamMessageId: string | null; + source: "user" | "force-compaction" | "idle-compaction"; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; +} + interface CompactionHandlerOptions { workspaceId: string; historyService: HistoryService; @@ -26,15 +43,24 @@ interface CompactionHandlerOptions { emitter: EventEmitter; /** Called when compaction completes successfully (e.g., to clear idle compaction pending state) */ onCompactionComplete?: () => void; + /** Get active compaction operation from session state (control-plane) */ + getActiveCompactionOperation: () => ActiveCompactionOperation | null; + /** Clear active compaction operation after completion */ + clearActiveCompactionOperation: () => void; } /** * Handles history compaction for agent sessions * * Responsible for: - * - Detecting compaction requests in stream events - * - Replacing chat history with compacted summaries + * - Detecting compaction operations via session state + * - Replacing chat history with compacted summaries (only on successful completion) * - Preserving cumulative usage across compactions + * + * IMPORTANT: History is only replaced when: + * 1. An active compaction operation exists in session state + * 2. The stream completed successfully (stream-end, not stream-abort/error) + * 3. The summary text is valid (non-empty, meets minimum length) */ export class CompactionHandler { private readonly workspaceId: string; @@ -44,6 +70,8 @@ export class CompactionHandler { private readonly emitter: EventEmitter; private readonly processedCompactionRequestIds: Set = new Set(); private readonly onCompactionComplete?: () => void; + private readonly getActiveCompactionOperation: () => ActiveCompactionOperation | null; + private readonly clearActiveCompactionOperation: () => void; /** Flag indicating post-compaction attachments should be generated on next turn */ private postCompactionAttachmentsPending = false; @@ -57,6 +85,8 @@ export class CompactionHandler { this.telemetryService = options.telemetryService; this.emitter = options.emitter; this.onCompactionComplete = options.onCompactionComplete; + this.getActiveCompactionOperation = options.getActiveCompactionOperation; + this.clearActiveCompactionOperation = options.clearActiveCompactionOperation; } /** @@ -86,55 +116,120 @@ export class CompactionHandler { } /** - * Handle compaction stream completion + * Handle compaction stream completion. + * + * Only processes compaction if there's an active operation in session state. + * This ensures compaction can only be triggered via the control-plane + * (compactHistory endpoint), not by user messages with special metadata. * - * Detects when a compaction stream finishes, extracts the summary, - * and performs history replacement atomically. + * @returns true if this was a compaction stream, false otherwise */ async handleCompletion(event: StreamEndEvent): Promise { - // Check if the last user message is a compaction-request - const historyResult = await this.historyService.getHistory(this.workspaceId); - if (!historyResult.success) { + const activeOperation = this.getActiveCompactionOperation(); + if (!activeOperation) { + // No active compaction - this is a normal stream completion return false; } - const messages = historyResult.data; - const lastUserMsg = [...messages].reverse().find((m) => m.role === "user"); - const isCompaction = lastUserMsg?.metadata?.muxMetadata?.type === "compaction-request"; - - if (!isCompaction || !lastUserMsg) { + // Only treat this stream-end as compaction if it belongs to the compaction stream. + // This prevents unrelated stream-end events (e.g., a previous stream finishing after a + // graceful stop) from replacing history. + if (activeOperation.streamMessageId !== event.messageId) { return false; } - // Dedupe: If we've already processed this compaction-request, skip - if (this.processedCompactionRequestIds.has(lastUserMsg.id)) { + // Dedupe by operationId (prevents double-processing on reconnect) + if (this.processedCompactionRequestIds.has(activeOperation.operationId)) { + log.debug("Skipping already-processed compaction operation", { + operationId: activeOperation.operationId, + }); return true; } + // Extract summary text from stream parts const summary = event.parts .filter((part): part is { type: "text"; text: string } => part.type === "text") .map((part) => part.text) .join(""); - // Check if this was an idle-compaction (auto-triggered due to inactivity) - const muxMeta = lastUserMsg.metadata?.muxMetadata; - const isIdleCompaction = - muxMeta?.type === "compaction-request" && muxMeta.source === "idle-compaction"; + // Validate summary before replacing history + const validationResult = this.validateSummary(summary); + if (!validationResult.valid) { + log.error("Compaction summary validation failed:", { + operationId: activeOperation.operationId, + reason: validationResult.reason, + summaryLength: summary.length, + }); + this.clearActiveCompactionOperation(); + // Emit stream-end so UI updates, but history remains unchanged + this.emitChatEvent(event); + return true; + } + + // Get current history for compaction + const historyResult = await this.historyService.getHistory(this.workspaceId); + if (!historyResult.success) { + log.error("Failed to get history for compaction:", historyResult.error); + this.clearActiveCompactionOperation(); + this.emitChatEvent(event); + return true; + } // Mark as processed before performing compaction - this.processedCompactionRequestIds.add(lastUserMsg.id); + this.processedCompactionRequestIds.add(activeOperation.operationId); + const isIdleCompaction = activeOperation.source === "idle-compaction"; const result = await this.performCompaction( summary, event.metadata, - messages, + historyResult.data, isIdleCompaction ); + if (!result.success) { - log.error("Compaction failed:", result.error); - return false; + log.error("Compaction failed:", { + operationId: activeOperation.operationId, + error: result.error, + }); + this.clearActiveCompactionOperation(); + this.emitChatEvent(event); + return true; + } + + // Success - capture telemetry and notify + this.captureCompactionTelemetry(event, isIdleCompaction ? "idle" : "manual"); + this.clearActiveCompactionOperation(); + this.onCompactionComplete?.(); + + // Emit stream-end so UI knows compaction completed + this.emitChatEvent(event); + return true; + } + + /** + * Validate that a summary is suitable for replacing history. + * Prevents data loss from empty or truncated summaries. + */ + private validateSummary(summary: string): { valid: true } | { valid: false; reason: string } { + if (!summary || summary.trim().length === 0) { + return { valid: false, reason: "Summary is empty" }; } + const wordCount = summary.trim().split(/\s+/).length; + if (wordCount < MIN_SUMMARY_WORDS) { + return { + valid: false, + reason: `Summary too short: ${wordCount} words (minimum: ${MIN_SUMMARY_WORDS})`, + }; + } + + return { valid: true }; + } + + /** + * Capture telemetry for compaction completion + */ + private captureCompactionTelemetry(event: StreamEndEvent, source: "idle" | "manual"): void { const durationSecs = typeof event.metadata.duration === "number" ? event.metadata.duration / 1000 : 0; const inputTokens = @@ -149,16 +244,9 @@ export class CompactionHandler { duration_b2: roundToBase2(durationSecs), input_tokens_b2: roundToBase2(inputTokens ?? 0), output_tokens_b2: roundToBase2(outputTokens ?? 0), - compaction_source: isIdleCompaction ? "idle" : "manual", + compaction_source: source, }, }); - - // Notify that compaction completed (clears idle compaction pending state) - this.onCompactionComplete?.(); - - // Emit stream-end to frontend so UI knows compaction is complete - this.emitChatEvent(event); - return true; } /** @@ -197,13 +285,6 @@ export class CompactionHandler { // Extract diffs BEFORE clearing history (they'll be gone after clear) this.cachedFileDiffs = extractEditedFileDiffs(messages); - // Clear entire history and get deleted sequences - const clearResult = await this.historyService.clearHistory(this.workspaceId); - if (!clearResult.success) { - return Err(`Failed to clear history: ${clearResult.error}`); - } - const deletedSequences = clearResult.data; - // For idle compaction, preserve the original recency timestamp so the workspace // doesn't appear "recently used" in the sidebar. Use the shared recency utility // to ensure consistency with how the sidebar computes recency. @@ -227,6 +308,9 @@ export class CompactionHandler { "assistant", summary, { + // Ensures the UI can later delete/replace the summary message (e.g., on /clear). + // replaceHistory() will normalize sequences starting at 0. + historySequence: 0, timestamp, compacted: isIdleCompaction ? "idle" : "user", model: metadata.model, @@ -237,13 +321,24 @@ export class CompactionHandler { } ); - // Append summary to history - const appendResult = await this.historyService.appendToHistory( - this.workspaceId, - summaryMessage - ); - if (!appendResult.success) { - return Err(`Failed to append summary: ${appendResult.error}`); + // Emit delete events based on the history we used for compaction. + // + // Why not rely on HistoryService.replaceHistory()'s deleted sequences? + // - The UI currently has these messages in memory (from streaming), so this is the + // authoritative list to clear from the transcript. + // - It avoids edge cases where chat.jsonl parsing skips a malformed trailing line, + // resulting in an empty deletedSequences list and a non-updating UI. + const deletedSequences = messages + .map((msg) => msg.metadata?.historySequence ?? -1) + .filter((s) => s >= 0); + + // Atomically replace history with the single summary message. + // This avoids the "delete then crash" failure mode. + const replaceResult = await this.historyService.replaceHistory(this.workspaceId, [ + summaryMessage, + ]); + if (!replaceResult.success) { + return Err(`Failed to replace history: ${replaceResult.error}`); } // Set flag to trigger post-compaction attachment injection on next turn diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index fbb86a6b80..531c9020d9 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -422,6 +422,60 @@ export class HistoryService { }); } + /** + * Atomically replace the entire chat history with the provided messages. + * + * This is the preferred primitive for compaction: it guarantees we never delete history + * and then crash before writing the replacement. The underlying write is atomic. + * + * @returns Result containing array of deleted historySequence numbers + */ + async replaceHistory( + workspaceId: string, + newMessages: MuxMessage[] + ): Promise> { + return this.fileLocks.withLock(workspaceId, async () => { + try { + const historyPath = this.getChatHistoryPath(workspaceId); + const workspaceDir = this.config.getSessionDir(workspaceId); + await fs.mkdir(workspaceDir, { recursive: true }); + + const historyResult = await this.getHistory(workspaceId); + const deletedSequences = historyResult.success + ? historyResult.data + .map((msg) => msg.metadata?.historySequence ?? -1) + .filter((s) => s >= 0) + : []; + + // Normalize sequence numbers to start at 0 + const normalizedMessages = newMessages.map((msg, index) => { + return { + ...msg, + metadata: { + ...(msg.metadata ?? {}), + historySequence: index, + }, + }; + }); + + const historyEntries = normalizedMessages + .map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n") + .join(""); + + // Atomic write prevents corruption if app crashes mid-write. + // If historyEntries is empty, this creates/overwrites the file with an empty payload. + await writeFileAtomic(historyPath, historyEntries); + + // Update sequence counter to continue from the end + this.sequenceCounters.set(workspaceId, normalizedMessages.length); + + return Ok(deletedSequences); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + return Err(`Failed to replace history: ${message}`); + } + }); + } async clearHistory(workspaceId: string): Promise> { const result = await this.truncateHistory(workspaceId, 1.0); if (!result.success) { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 9f8303f288..3f0524e186 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -1873,6 +1873,112 @@ export class WorkspaceService extends EventEmitter { return Ok(undefined); } + /** + * Control-plane endpoint for compaction. + * Atomically interrupts any active stream (if requested) and starts compaction. + * + * Unlike sendMessage with muxMetadata, this: + * - Does not persist a user message with the compaction prompt + * - Uses session state to track compaction operation + * - Provides explicit operationId for correlation + */ + async compactHistory( + workspaceId: string, + options: { + model?: string; + maxOutputTokens?: number; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + source?: "user" | "force-compaction" | "idle-compaction"; + interrupt?: "none" | "graceful" | "abort"; + sendMessageOptions?: Omit; + } + ): Promise> { + const operationId = `compact-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; + const source = options.source ?? "user"; + const interrupt = options.interrupt ?? "none"; + + log.debug("compactHistory: starting", { + workspaceId, + operationId, + source, + interrupt, + hasModel: !!options.model, + hasContinueMessage: !!options.continueMessage, + }); + + try { + // Block operations on workspaces being renamed/removed + if (this.renamingWorkspaces.has(workspaceId)) { + return Err({ + type: "unknown", + raw: "Workspace is being renamed. Please wait and try again.", + }); + } + if (this.removingWorkspaces.has(workspaceId)) { + return Err({ + type: "unknown", + raw: "Workspace is being deleted. Please wait and try again.", + }); + } + if (!this.config.findWorkspace(workspaceId)) { + return Err({ type: "unknown", raw: "Workspace not found. It may have been deleted." }); + } + + const session = this.getOrCreateSession(workspaceId); + + // Handle active stream based on interrupt mode + if (this.aiService.isStreaming(workspaceId)) { + switch (interrupt) { + case "none": + return Err({ + type: "unknown", + raw: "Cannot compact while stream is active. Use interrupt option or wait for stream to complete.", + }); + case "graceful": + // Wait for stream to complete (soft interrupt) + await session.interruptStream({ soft: true }); + break; + case "abort": + // Interrupt immediately. + // NOTE: We intentionally preserve any streamed partial output. + // startCompaction() will commit partial.json to history before building the compaction prompt. + await session.interruptStream(); + break; + } + } + + // Start compaction via session + const result = await session.startCompaction({ + operationId, + model: options.model, + maxOutputTokens: options.maxOutputTokens, + continueMessage: options.continueMessage, + source, + sendMessageOptions: options.sendMessageOptions, + }); + + if (!result.success) { + return result; + } + + // Skip recency update for idle compaction to preserve "last used" time + if (source !== "idle-compaction") { + void this.updateRecencyTimestamp(workspaceId, Date.now()); + } + + return Ok({ operationId }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : JSON.stringify(error, null, 2); + log.error("Unexpected error in compactHistory:", error); + return Err({ type: "unknown", raw: `Failed to compact history: ${errorMessage}` }); + } + } + async replaceHistory( workspaceId: string, summaryMessage: MuxMessage, diff --git a/tests/ipc/compactHistory.test.ts b/tests/ipc/compactHistory.test.ts new file mode 100644 index 0000000000..81cce2bc1d --- /dev/null +++ b/tests/ipc/compactHistory.test.ts @@ -0,0 +1,169 @@ +/** + * compactHistory integration tests. + * + * Ensures compaction is a control-plane operation (not a slash-command string), and that: + * - History is replaced only on successful compaction completion + * - continueMessage is auto-sent after compaction completes + * + * Requirements: + * - Uses the Haiku model for both compaction and the follow-up continue message + * - Seeds history via HistoryService (test-only) to avoid extra API calls + */ + +import { shouldRunIntegrationTests, validateApiKeys } from "./setup"; +import { + createSharedRepo, + cleanupSharedRepo, + withSharedWorkspace, + configureTestRetries, +} from "./sendMessageTestHelpers"; +import { modelString, seedHistoryMessages } from "./helpers"; +import { KNOWN_MODELS } from "../../src/common/constants/knownModels"; + +// Skip all tests if TEST_INTEGRATION is not set +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +if (shouldRunIntegrationTests()) { + validateApiKeys(["ANTHROPIC_API_KEY"]); +} + +beforeAll(createSharedRepo); +afterAll(cleanupSharedRepo); + +function getTextFromMessageParts(message: { + parts?: Array<{ type: string; text?: string }>; +}): string { + return ( + message.parts + ?.filter((part) => part.type === "text") + .map((part) => part.text ?? "") + .join("") ?? "" + ); +} + +describeIntegration("compactHistory integration tests", () => { + configureTestRetries(3); + + test.concurrent( + "should compact history and then auto-send continueMessage", + async () => { + await withSharedWorkspace("anthropic", async ({ env, workspaceId, collector }) => { + const haiku = modelString("anthropic", KNOWN_MODELS.HAIKU.providerModelId); + + // Seed history quickly (test-only) to avoid extra API calls. + const seededIds = await seedHistoryMessages(workspaceId, env.config, [ + { + id: "seed-user-0", + role: "user", + content: "Context: we are discussing a small code refactor.", + }, + { + id: "seed-assistant-0", + role: "assistant", + content: "Acknowledged. I will help.", + }, + { + id: "seed-user-1", + role: "user", + content: "Please keep responses short and practical.", + }, + { + id: "seed-assistant-1", + role: "assistant", + content: "Understood.", + }, + ]); + + collector.clear(); + + const continueText = "Continue: reply with exactly 'OK'."; + + // Trigger compaction via the control-plane API. + const compactResult = await env.orpc.workspace.compactHistory({ + workspaceId, + model: haiku, + maxOutputTokens: 800, + source: "user", + interrupt: "none", + continueMessage: { text: continueText }, + sendMessageOptions: { + model: haiku, + thinkingLevel: "off", + }, + }); + + expect(compactResult.success).toBe(true); + if (!compactResult.success) { + throw new Error(String(compactResult.error)); + } + + // Wait for compaction stream to start + end. + const compactionStreamStart = await collector.waitForEventN("stream-start", 1, 20000); + expect(compactionStreamStart).not.toBeNull(); + const compactionMessageId = (compactionStreamStart as { messageId: string }).messageId; + + const compactionStreamEnd = await collector.waitForEventN("stream-end", 1, 45000); + expect(compactionStreamEnd).not.toBeNull(); + expect((compactionStreamEnd as { messageId: string }).messageId).toBe(compactionMessageId); + expect((compactionStreamEnd as { metadata: { model?: string } }).metadata.model).toBe( + haiku + ); + + // Compaction should emit delete + summary message. + const deleteEvent = await collector.waitForEvent("delete", 10000); + expect(deleteEvent).not.toBeNull(); + + const summaryMessage = collector + .getEvents() + .find( + (e) => e.type === "message" && e.role === "assistant" && Boolean(e.metadata?.compacted) + ); + expect(summaryMessage).toBeDefined(); + + // Continue message should be persisted as a user message and then streamed. + // We expect a second stream-start for the follow-up message. + const continueStreamStart = await collector.waitForEventN("stream-start", 2, 20000); + expect(continueStreamStart).not.toBeNull(); + + const continueMessageId = (continueStreamStart as { messageId: string }).messageId; + expect(continueMessageId).not.toBe(compactionMessageId); + + const continueUserMessage = collector + .getEvents() + .find( + (e) => + e.type === "message" && + e.role === "user" && + getTextFromMessageParts(e) === continueText + ); + expect(continueUserMessage).toBeDefined(); + + const continueStreamEnd = await collector.waitForEventN("stream-end", 2, 45000); + expect(continueStreamEnd).not.toBeNull(); + expect((continueStreamEnd as { messageId: string }).messageId).toBe(continueMessageId); + + // Verify persisted history: + // - seeded messages were removed + // - summary exists + // - continue message exists + const replay = await env.orpc.workspace.getFullReplay({ workspaceId }); + const replayMessages = replay.filter((m) => m.type === "message"); + + for (const id of seededIds) { + expect(replayMessages.some((m) => m.id === id)).toBe(false); + } + + const summaryIndex = replayMessages.findIndex( + (m) => m.role === "assistant" && Boolean(m.metadata?.compacted) + ); + expect(summaryIndex).toBe(0); + + const continueIndex = replayMessages.findIndex( + (m) => m.role === "user" && getTextFromMessageParts(m) === continueText + ); + expect(continueIndex).toBeGreaterThan(summaryIndex); + }); + }, + 90000 + ); +}); diff --git a/tests/ipc/helpers.ts b/tests/ipc/helpers.ts index bc0ceae9ec..c69745acb2 100644 --- a/tests/ipc/helpers.ts +++ b/tests/ipc/helpers.ts @@ -590,6 +590,36 @@ export async function cleanupTempGitRepo(repoPath: string): Promise { console.warn(`Failed to cleanup temp git repo after ${maxRetries} attempts:`, lastError); } +/** + * Seed a workspace history with explicit messages. + * + * Test-only: uses HistoryService directly to populate chat.jsonl without making API calls. + * Real application code should NEVER bypass IPC like this. + */ +export async function seedHistoryMessages( + workspaceId: string, + config: { getSessionDir: (id: string) => string }, + messages: Array<{ id?: string; role: "user" | "assistant"; content: string }> +): Promise { + // HistoryService only needs getSessionDir, so we can cast the partial config. + const historyService = new HistoryService(config as any); + + const ids: string[] = []; + for (let i = 0; i < messages.length; i++) { + const entry = messages[i]; + const id = entry.id ?? `seed-msg-${i}`; + ids.push(id); + + const message = createMuxMessage(id, entry.role, entry.content, {}); + const result = await historyService.appendToHistory(workspaceId, message); + if (!result.success) { + throw new Error(`Failed to append history message ${i} (${id}): ${result.error}`); + } + } + + return ids; +} + /** * Build large conversation history to test context limits *