From b544b4f4035f621cd10b3a73cc7f8ee2d03d4e40 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 20 Dec 2025 19:09:02 -0600 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=A4=96=20fix:=20make=20compaction=20c?= =?UTF-8?q?rash-safe=20and=20recover=20via=20resume=20manager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/hooks/useResumeManager.ts | 112 ++++++++++++----- src/browser/utils/chatCommands.ts | 6 + src/browser/utils/compaction/handler.ts | 14 ++- src/common/orpc/schemas/stream.ts | 2 +- src/node/services/agentSession.ts | 2 + src/node/services/compactionHandler.test.ts | 93 +++++--------- src/node/services/compactionHandler.ts | 16 +-- src/node/services/historyService.test.ts | 127 ++++++++++++++++++++ src/node/services/historyService.ts | 63 ++++++++++ src/node/services/workspaceService.ts | 17 ++- 10 files changed, 338 insertions(+), 114 deletions(-) diff --git a/src/browser/hooks/useResumeManager.ts b/src/browser/hooks/useResumeManager.ts index b0fa88e2ed..aca7c8c97b 100644 --- a/src/browser/hooks/useResumeManager.ts +++ b/src/browser/hooks/useResumeManager.ts @@ -1,14 +1,18 @@ import { useEffect, useRef } from "react"; import { useWorkspaceStoreRaw, type WorkspaceState } from "@/browser/stores/WorkspaceStore"; import { CUSTOM_EVENTS, type CustomEventType } from "@/common/constants/events"; -import { getAutoRetryKey, getRetryStateKey } from "@/common/constants/storage"; +import { + getAutoRetryKey, + getRetryStateKey, + getCancelledCompactionKey, +} from "@/common/constants/storage"; import { getSendOptionsFromStorage } from "@/browser/utils/messages/sendOptions"; import { readPersistedState, updatePersistedState } from "./usePersistedState"; import { isEligibleForAutoRetry, isNonRetryableSendError, } from "@/browser/utils/messages/retryEligibility"; -import { applyCompactionOverrides } from "@/browser/utils/messages/compactionOptions"; +import { executeCompaction } from "@/browser/utils/chatCommands"; import type { SendMessageError } from "@/common/types/errors"; import { createFailedRetryState, @@ -23,6 +27,15 @@ export interface RetryState { lastError?: SendMessageError; } +/** + * Persisted marker for user-cancelled compaction. + * Used to distinguish intentional cancellation (Ctrl+C) from crash/force-exit. + */ +export interface CancelledCompactionMarker { + messageId: string; + timestamp: number; +} + /** * Centralized auto-resume manager for interrupted streams * @@ -163,45 +176,80 @@ export function useResumeManager() { ); try { + if (!api) { + retryingRef.current.delete(workspaceId); + return; + } + // Start with workspace defaults - let options = getSendOptionsFromStorage(workspaceId); + const options = getSendOptionsFromStorage(workspaceId); // Check if last user message was a compaction request const state = workspaceStatesRef.current.get(workspaceId); - if (state) { - const lastUserMsg = [...state.messages].reverse().find((msg) => msg.type === "user"); - if (lastUserMsg?.compactionRequest) { - // Apply compaction overrides using shared function (same as ChatInput) - // This ensures custom model/tokens are preserved across resume - options = applyCompactionOverrides(options, { - model: lastUserMsg.compactionRequest.parsed.model, - maxOutputTokens: lastUserMsg.compactionRequest.parsed.maxOutputTokens, - continueMessage: { - text: lastUserMsg.compactionRequest.parsed.continueMessage?.text ?? "", - imageParts: lastUserMsg.compactionRequest.parsed.continueMessage?.imageParts, - model: lastUserMsg.compactionRequest.parsed.continueMessage?.model ?? options.model, - mode: lastUserMsg.compactionRequest.parsed.continueMessage?.mode ?? "exec", - }, - }); + const lastUserMsg = state?.messages + ? [...state.messages].reverse().find((msg) => msg.type === "user") + : undefined; + + if (lastUserMsg?.compactionRequest) { + // Check if this compaction was user-cancelled (Ctrl+C) + const cancelledMarker = readPersistedState( + getCancelledCompactionKey(workspaceId), + null + ); + + if (cancelledMarker && cancelledMarker.messageId === lastUserMsg.id) { + if (!isManual) { + // User explicitly cancelled this compaction - don't auto-retry + console.debug( + `[retry] ${workspaceId} skipping cancelled compaction (messageId=${lastUserMsg.id})` + ); + return; + } + + // Manual retry: clear the marker and proceed + updatePersistedState(getCancelledCompactionKey(workspaceId), () => null); } - } - if (!api) { - retryingRef.current.delete(workspaceId); - return; - } - const result = await api.workspace.resumeStream({ workspaceId, options }); + // Retry compaction via executeCompaction (re-sends the compaction request) + // This properly rebuilds the compaction-specific behavior including continueMessage queuing + console.debug(`[retry] ${workspaceId} retrying interrupted compaction`); + const { parsed } = lastUserMsg.compactionRequest; + const result = await executeCompaction({ + api, + workspaceId, + sendMessageOptions: options, + model: parsed.model, + maxOutputTokens: parsed.maxOutputTokens, + continueMessage: parsed.continueMessage, + editMessageId: lastUserMsg.id, // Edit the existing compaction request message + }); - if (!result.success) { - // Store error in retry state so RetryBarrier can display it - const newState = createFailedRetryState(attempt, result.error); - console.debug( - `[retry] ${workspaceId} resumeStream failed: attempt ${attempt} → ${newState.attempt}` - ); - updatePersistedState(getRetryStateKey(workspaceId), newState); + if (!result.success) { + const errorData: SendMessageError = { + type: "unknown", + raw: result.error ?? "Failed to retry compaction", + }; + const newState = createFailedRetryState(attempt, errorData); + console.debug( + `[retry] ${workspaceId} compaction failed: attempt ${attempt} → ${newState.attempt}` + ); + updatePersistedState(getRetryStateKey(workspaceId), newState); + } + } else { + // Normal stream resume (non-compaction) + const result = await api.workspace.resumeStream({ workspaceId, options }); + + if (!result.success) { + // Store error in retry state so RetryBarrier can display it + const newState = createFailedRetryState(attempt, result.error); + console.debug( + `[retry] ${workspaceId} resumeStream failed: attempt ${attempt} → ${newState.attempt}` + ); + updatePersistedState(getRetryStateKey(workspaceId), newState); + } } // Note: Don't clear retry state on success - stream-end event will handle that - // resumeStream success just means "stream initiated", not "stream completed" + // resumeStream/executeCompaction success just means "stream initiated", not "stream completed" // Clearing here causes backoff reset bug when stream starts then immediately fails } catch (error) { // Store error in retry state for display diff --git a/src/browser/utils/chatCommands.ts b/src/browser/utils/chatCommands.ts index f7264c9528..b5da4dba70 100644 --- a/src/browser/utils/chatCommands.ts +++ b/src/browser/utils/chatCommands.ts @@ -26,6 +26,8 @@ import { resolveCompactionModel, isValidModelFormat, } from "@/browser/utils/messages/compactionModelPreference"; +import { getCancelledCompactionKey } from "@/common/constants/storage"; +import { updatePersistedState } from "@/browser/hooks/usePersistedState"; import type { ImageAttachment } from "../components/ImageAttachments"; import { dispatchWorkspaceSwitch } from "./workspaceEvents"; import { getRuntimeKey, copyWorkspaceStorage } from "@/common/constants/storage"; @@ -695,6 +697,10 @@ export function prepareCompactionMessage(options: CompactionOptions): { export async function executeCompaction( options: CompactionOptions & { api: RouterClient } ): Promise { + // Clear any cancelled-compaction marker since we're (re-)starting compaction + // This allows auto-retry to work if this attempt is interrupted by crash/force-exit + updatePersistedState(getCancelledCompactionKey(options.workspaceId), () => null); + const { messageText, metadata, sendOptions } = prepareCompactionMessage(options); const result = await options.api.workspace.sendMessage({ diff --git a/src/browser/utils/compaction/handler.ts b/src/browser/utils/compaction/handler.ts index 3034cc89d5..9011041cb8 100644 --- a/src/browser/utils/compaction/handler.ts +++ b/src/browser/utils/compaction/handler.ts @@ -5,8 +5,12 @@ * with original /compact command restored for re-editing. */ -import type { StreamingMessageAggregator } from "@/browser/utils/messages/StreamingMessageAggregator"; import type { APIClient } from "@/browser/contexts/API"; +import { updatePersistedState } from "@/browser/hooks/usePersistedState"; +import type { CancelledCompactionMarker } from "@/browser/hooks/useResumeManager"; +import type { StreamingMessageAggregator } from "@/browser/utils/messages/StreamingMessageAggregator"; +import { getCancelledCompactionKey } from "@/common/constants/storage"; + import { buildCompactionEditText } from "./format"; /** @@ -81,6 +85,14 @@ export async function cancelCompaction( // clobber the edit buffer. startEditingMessage(compactionRequestMsg.id, command); + // Mark this compaction as user-cancelled so auto-retry doesn't pick it up. + // This distinguishes intentional Ctrl+C from crash/force-exit. + const marker: CancelledCompactionMarker = { + messageId: compactionRequestMsg.id, + timestamp: Date.now(), + }; + updatePersistedState(getCancelledCompactionKey(workspaceId), () => marker); + // Interrupt stream with abandonPartial flag // Backend detects this and skips compaction (Ctrl+C flow) await client.workspace.interruptStream({ diff --git a/src/common/orpc/schemas/stream.ts b/src/common/orpc/schemas/stream.ts index c7db956103..1a8bbc8d21 100644 --- a/src/common/orpc/schemas/stream.ts +++ b/src/common/orpc/schemas/stream.ts @@ -322,7 +322,7 @@ export const WorkspaceChatMessageSchema = z.discriminatedUnion("type", [ UsageDeltaEventSchema, QueuedMessageChangedEventSchema, RestoreToInputEventSchema, - // Idle compaction notification + // Compaction notifications IdleCompactionNeededEventSchema, // Init events ...WorkspaceInitEventSchema.def.options, diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 0d5afcddec..16f144aab6 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -299,6 +299,8 @@ export class AgentSession { workspaceId: this.workspaceId, message: { type: "caught-up" }, }); + // Note: Aborted compaction recovery is handled by useResumeManager on the frontend, + // which detects interrupted compaction-request messages and retries via executeCompaction. } } diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index 5a49aeb8e3..8164d1120e 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -24,12 +24,16 @@ const createMockHistoryService = () => { let getHistoryResult: Result = Ok([]); let clearHistoryResult: Result = Ok([]); let appendToHistoryResult: Result = Ok(undefined); + let replaceHistoryResult: Result<{ deletedSequences: number[] }, string> = Ok({ + deletedSequences: [], + }); const getHistory = mock((_) => Promise.resolve(getHistoryResult)); const clearHistory = mock((_) => Promise.resolve(clearHistoryResult)); const appendToHistory = mock((_, __) => Promise.resolve(appendToHistoryResult)); const updateHistory = mock(() => Promise.resolve(Ok(undefined))); const truncateAfterMessage = mock(() => Promise.resolve(Ok(undefined))); + const replaceHistoryWithSummary = mock((_, __) => Promise.resolve(replaceHistoryResult)); return { getHistory, @@ -37,6 +41,7 @@ const createMockHistoryService = () => { appendToHistory, updateHistory, truncateAfterMessage, + replaceHistoryWithSummary, // Allow setting mock return values mockGetHistory: (result: Result) => { getHistoryResult = result; @@ -47,6 +52,9 @@ const createMockHistoryService = () => { mockAppendToHistory: (result: Result) => { appendToHistoryResult = result; }, + mockReplaceHistory: (result: Result<{ deletedSequences: number[] }, string>) => { + replaceHistoryResult = result; + }, }; }; @@ -158,7 +166,7 @@ describe("CompactionHandler", () => { const result = await handler.handleCompletion(event); expect(result).toBe(false); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(0); + expect(mockHistoryService.replaceHistoryWithSummary.mock.calls).toHaveLength(0); }); it("should return false when historyService fails", async () => { @@ -235,7 +243,8 @@ describe("CompactionHandler", () => { }; await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + const appendedMsg = mockHistoryService.replaceHistoryWithSummary.mock + .calls[0][1] as MuxMessage; expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( "Part 1 Part 2 Part 3" ); @@ -244,28 +253,25 @@ describe("CompactionHandler", () => { it("should extract summary text from event.parts", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("This is the summary"); await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + const appendedMsg = mockHistoryService.replaceHistoryWithSummary.mock + .calls[0][1] as MuxMessage; expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( "This is the summary" ); }); - it("should delete partial.json before clearing history (race condition fix)", async () => { + it("should delete partial.json before replacing history (race condition fix)", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); - // deletePartial should be called once before clearHistory + // deletePartial should be called once before replaceHistoryWithSummary expect(mockPartialService.deletePartial.mock.calls).toHaveLength(1); expect(mockPartialService.deletePartial.mock.calls[0][0]).toBe(workspaceId); @@ -273,20 +279,18 @@ describe("CompactionHandler", () => { // but the important thing is that it IS called during compaction) }); - it("should call clearHistory() and appendToHistory()", async () => { + it("should call replaceHistoryWithSummary() atomically", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.clearHistory.mock.calls[0][0]).toBe(workspaceId); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.appendToHistory.mock.calls[0][0]).toBe(workspaceId); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + // Should use atomic replacement instead of clear + append + expect(mockHistoryService.replaceHistoryWithSummary.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistoryWithSummary.mock.calls[0][0]).toBe(workspaceId); + const appendedMsg = mockHistoryService.replaceHistoryWithSummary.mock + .calls[0][1] as MuxMessage; expect(appendedMsg.role).toBe("assistant"); expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe("Summary"); }); @@ -294,8 +298,7 @@ describe("CompactionHandler", () => { it("should emit delete event for old messages", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1, 2, 3])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + mockHistoryService.mockReplaceHistory(Ok({ deletedSequences: [0, 1, 2, 3] })); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); @@ -311,8 +314,6 @@ describe("CompactionHandler", () => { it("should emit summary message with complete metadata", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const usage = { inputTokens: 200, outputTokens: 100, totalTokens: 300 }; const event = createStreamEndEvent("Summary", { @@ -344,8 +345,6 @@ describe("CompactionHandler", () => { it("should emit stream-end event to frontend", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary", { duration: 1234 }); await handler.handleCompletion(event); @@ -360,13 +359,12 @@ describe("CompactionHandler", () => { it("should set compacted in summary metadata", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; + const appendedMsg = mockHistoryService.replaceHistoryWithSummary.mock + .calls[0][1] as MuxMessage; expect(appendedMsg.metadata?.compacted).toBe("user"); }); }); @@ -375,20 +373,16 @@ describe("CompactionHandler", () => { it("should track processed compaction-request IDs", async () => { const compactionReq = createCompactionRequest("req-unique"); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistoryWithSummary.mock.calls).toHaveLength(1); }); it("should return true without re-processing when same request ID seen twice", async () => { const compactionReq = createCompactionRequest("req-dupe"); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); const result1 = await handler.handleCompletion(event); @@ -396,14 +390,12 @@ describe("CompactionHandler", () => { expect(result1).toBe(true); expect(result2).toBe(true); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistoryWithSummary.mock.calls).toHaveLength(1); }); it("should not emit duplicate events", async () => { const compactionReq = createCompactionRequest("req-dupe-2"); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); @@ -415,39 +407,23 @@ describe("CompactionHandler", () => { expect(eventCountAfterSecond).toBe(eventCountAfterFirst); }); - it("should not clear history twice", async () => { + it("should not replace history twice", async () => { const compactionReq = createCompactionRequest("req-dupe-3"); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistoryWithSummary.mock.calls).toHaveLength(1); }); }); describe("Error Handling", () => { - it("should return false when clearHistory() fails", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Clear failed")); - - const event = createStreamEndEvent("Summary"); - const result = await handler.handleCompletion(event); - - expect(result).toBe(false); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(0); - }); - - it("should return false when appendToHistory() fails", async () => { + it("should return false when replaceHistoryWithSummary() fails", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Err("Append failed")); + mockHistoryService.mockReplaceHistory(Err("Replace failed")); const event = createStreamEndEvent("Summary"); const result = await handler.handleCompletion(event); @@ -458,7 +434,7 @@ describe("CompactionHandler", () => { it("should log errors but not throw", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Database corruption")); + mockHistoryService.mockReplaceHistory(Err("Database corruption")); const event = createStreamEndEvent("Summary"); @@ -470,7 +446,7 @@ describe("CompactionHandler", () => { it("should not emit events when compaction fails mid-process", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Clear failed")); + mockHistoryService.mockReplaceHistory(Err("Replace failed")); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); @@ -483,8 +459,6 @@ describe("CompactionHandler", () => { it("should include workspaceId in all chat-event emissions", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); @@ -499,8 +473,7 @@ describe("CompactionHandler", () => { it("should emit DeleteMessage with correct type and historySequences array", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([5, 10, 15])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + mockHistoryService.mockReplaceHistory(Ok({ deletedSequences: [5, 10, 15] })); const event = createStreamEndEvent("Summary"); await handler.handleCompletion(event); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 62cbeeae51..8d163f70c7 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -197,13 +197,6 @@ export class CompactionHandler { // Extract diffs BEFORE clearing history (they'll be gone after clear) this.cachedFileDiffs = extractEditedFileDiffs(messages); - // Clear entire history and get deleted sequences - const clearResult = await this.historyService.clearHistory(this.workspaceId); - if (!clearResult.success) { - return Err(`Failed to clear history: ${clearResult.error}`); - } - const deletedSequences = clearResult.data; - // For idle compaction, preserve the original recency timestamp so the workspace // doesn't appear "recently used" in the sidebar. Use the shared recency utility // to ensure consistency with how the sidebar computes recency. @@ -237,14 +230,15 @@ export class CompactionHandler { } ); - // Append summary to history - const appendResult = await this.historyService.appendToHistory( + // Atomically replace history with summary (crash-safe: no window where history is deleted but summary not written) + const replaceResult = await this.historyService.replaceHistoryWithSummary( this.workspaceId, summaryMessage ); - if (!appendResult.success) { - return Err(`Failed to append summary: ${appendResult.error}`); + if (!replaceResult.success) { + return Err(`Failed to replace history: ${replaceResult.error}`); } + const { deletedSequences } = replaceResult.data; // Set flag to trigger post-compaction attachment injection on next turn this.postCompactionAttachmentsPending = true; diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 6d3229676e..7532a4d8da 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -523,4 +523,131 @@ describe("HistoryService", () => { } }); }); + + describe("replaceHistoryWithSummary", () => { + it("should atomically replace history with summary message", async () => { + const workspaceId = "workspace1"; + + // Set up existing history with multiple messages + const msg1 = createMuxMessage("msg1", "user", "Hello", { historySequence: 0 }); + const msg2 = createMuxMessage("msg2", "assistant", "Hi there", { historySequence: 1 }); + const msg3 = createMuxMessage("msg3", "user", "How are you?", { historySequence: 2 }); + const msg4 = createMuxMessage("msg4", "assistant", "I am fine", { historySequence: 3 }); + + await service.appendToHistory(workspaceId, msg1); + await service.appendToHistory(workspaceId, msg2); + await service.appendToHistory(workspaceId, msg3); + await service.appendToHistory(workspaceId, msg4); + + // Create summary message + const summary = createMuxMessage("summary1", "assistant", "Summary of conversation", { + compacted: "user", + }); + + // Replace history with summary + const result = await service.replaceHistoryWithSummary(workspaceId, summary); + + expect(result.success).toBe(true); + if (result.success) { + // Should return deleted sequence numbers + expect(result.data.deletedSequences).toEqual([0, 1, 2, 3]); + } + + // Verify history contains only summary + const history = await service.getHistory(workspaceId); + expect(history.success).toBe(true); + if (history.success) { + expect(history.data).toHaveLength(1); + expect(history.data[0].id).toBe("summary1"); + expect(history.data[0].metadata?.historySequence).toBe(0); + } + }); + + it("should set next sequence to 1 after replacing with summary", async () => { + const workspaceId = "workspace1"; + + // Set up existing history + const msg1 = createMuxMessage("msg1", "user", "Hello", { historySequence: 0 }); + await service.appendToHistory(workspaceId, msg1); + + // Replace with summary + const summary = createMuxMessage("summary1", "assistant", "Summary"); + await service.replaceHistoryWithSummary(workspaceId, summary); + + // Append a new message - should get sequence 1 + const newMsg = createMuxMessage("newMsg", "user", "New message"); + await service.appendToHistory(workspaceId, newMsg); + + const history = await service.getHistory(workspaceId); + expect(history.success).toBe(true); + if (history.success) { + expect(history.data).toHaveLength(2); + expect(history.data[0].metadata?.historySequence).toBe(0); // summary + expect(history.data[1].metadata?.historySequence).toBe(1); // new message + } + }); + + it("should work on empty history", async () => { + const workspaceId = "workspace1"; + + // Replace empty history with summary + const summary = createMuxMessage("summary1", "assistant", "Summary of nothing"); + const result = await service.replaceHistoryWithSummary(workspaceId, summary); + + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.deletedSequences).toEqual([]); + } + + const history = await service.getHistory(workspaceId); + expect(history.success).toBe(true); + if (history.success) { + expect(history.data).toHaveLength(1); + expect(history.data[0].id).toBe("summary1"); + } + }); + + it("should preserve summary message metadata except historySequence", async () => { + const workspaceId = "workspace1"; + + const summary = createMuxMessage("summary1", "assistant", "Summary", { + compacted: "idle", + timestamp: 12345, + model: "test-model", + }); + + await service.replaceHistoryWithSummary(workspaceId, summary); + + const history = await service.getHistory(workspaceId); + expect(history.success).toBe(true); + if (history.success) { + const msg = history.data[0]; + expect(msg.metadata?.compacted).toBe("idle"); + expect(msg.metadata?.timestamp).toBe(12345); + expect(msg.metadata?.model).toBe("test-model"); + expect(msg.metadata?.historySequence).toBe(0); // Always 0 for summary + } + }); + + it("should create workspace directory if it does not exist", async () => { + const workspaceId = "new-workspace-for-replace"; + const workspaceDir = config.getSessionDir(workspaceId); + + // Ensure directory doesn't exist + try { + await fs.rmdir(workspaceDir, { recursive: true }); + } catch { + // Ignore if doesn't exist + } + + const summary = createMuxMessage("summary1", "assistant", "Summary"); + const result = await service.replaceHistoryWithSummary(workspaceId, summary); + + expect(result.success).toBe(true); + + // Verify file was created + const stats = await fs.stat(path.join(workspaceDir, "chat.jsonl")); + expect(stats.isFile()).toBe(true); + }); + }); }); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index fbb86a6b80..481398b43c 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -422,6 +422,69 @@ export class HistoryService { }); } + /** + * Atomically replace all history with a single summary message. + * + * This is a crash-safe operation: if power is lost mid-write, either the old + * history remains intact or the new summary is fully written. There is no + * intermediate state where history is deleted but the summary hasn't been written. + * + * @param workspaceId The workspace ID + * @param summaryMessage The summary message to replace history with + * @returns Result containing array of deleted historySequence numbers + */ + async replaceHistoryWithSummary( + workspaceId: string, + summaryMessage: MuxMessage + ): Promise> { + return this.fileLocks.withLock(workspaceId, async () => { + try { + const historyPath = this.getChatHistoryPath(workspaceId); + const workspaceDir = this.config.getSessionDir(workspaceId); + + // Ensure workspace directory exists + await fs.mkdir(workspaceDir, { recursive: true }); + + // Read existing messages to get deleted sequences + const historyResult = await this.getHistory(workspaceId); + const deletedSequences = historyResult.success + ? historyResult.data + .map((msg) => msg.metadata?.historySequence ?? -1) + .filter((s) => s >= 0) + : []; + + // Assign historySequence 0 to summary (it's the new "first" message) + const summaryWithSequence: MuxMessage = { + ...summaryMessage, + metadata: { + ...summaryMessage.metadata, + historySequence: 0, + }, + }; + + // Store with workspace context + const historyEntry = { + ...summaryWithSequence, + workspaceId, + }; + + // Atomic write: either succeeds completely or leaves old file intact + await writeFileAtomic(historyPath, JSON.stringify(historyEntry) + "\n"); + + // Update sequence counter: next message gets sequence 1 + this.sequenceCounters.set(workspaceId, 1); + + log.debug( + `[HISTORY REPLACE] Atomically replaced ${deletedSequences.length} messages with summary in ${workspaceId}` + ); + + return Ok({ deletedSequences }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + return Err(`Failed to replace history: ${message}`); + } + }); + } async clearHistory(workspaceId: string): Promise> { const result = await this.truncateHistory(workspaceId, 1.0); if (!result.success) { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 9f8303f288..b6fd8e794a 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -1887,16 +1887,15 @@ export class WorkspaceService extends EventEmitter { } try { - const clearResult = await this.historyService.clearHistory(workspaceId); - if (!clearResult.success) { - return Err(`Failed to clear history: ${clearResult.error}`); - } - const deletedSequences = clearResult.data; - - const appendResult = await this.historyService.appendToHistory(workspaceId, summaryMessage); - if (!appendResult.success) { - return Err(`Failed to append summary message: ${appendResult.error}`); + // Atomically replace history (crash-safe: no window where history is deleted but summary not written) + const replaceResult = await this.historyService.replaceHistoryWithSummary( + workspaceId, + summaryMessage + ); + if (!replaceResult.success) { + return Err(`Failed to replace history: ${replaceResult.error}`); } + const { deletedSequences } = replaceResult.data; // Emit through the session so ORPC subscriptions receive the events const session = this.sessions.get(workspaceId); From dde4fe55c61008a348e1aef6e3261a833c728686 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 14:34:39 -0600 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A4=96=20ci:=20add=20compaction=20cra?= =?UTF-8?q?sh/restart=20integration=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/ipc/compactionCrashAtomicity.test.ts | 211 ++++++++++++++ tests/ipc/compactionRestartRetry.test.ts | 261 ++++++++++++++++++ tests/ipc/setup.ts | 52 ++++ tests/ipc/workers/replaceChatHistoryWorker.ts | 107 +++++++ 4 files changed, 631 insertions(+) create mode 100644 tests/ipc/compactionCrashAtomicity.test.ts create mode 100644 tests/ipc/compactionRestartRetry.test.ts create mode 100644 tests/ipc/workers/replaceChatHistoryWorker.ts diff --git a/tests/ipc/compactionCrashAtomicity.test.ts b/tests/ipc/compactionCrashAtomicity.test.ts new file mode 100644 index 0000000000..262b9bd733 --- /dev/null +++ b/tests/ipc/compactionCrashAtomicity.test.ts @@ -0,0 +1,211 @@ +import * as path from "path"; +import * as fs from "fs/promises"; +import * as fsSync from "fs"; +import { spawn } from "child_process"; +import type { WorkspaceChatMessage } from "@/common/orpc/types"; +import { + shouldRunIntegrationTests, + createTestEnvironment, + createTestEnvironmentFromRootDir, + cleanupTestEnvironment, +} from "./setup"; +import { + createTempGitRepo, + cleanupTempGitRepo, + createWorkspace, + generateBranchName, +} from "./helpers"; +import { HistoryService } from "../../src/node/services/historyService"; +import { createMuxMessage } from "../../src/common/types/message"; + +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +function isChatMessage(event: WorkspaceChatMessage): event is WorkspaceChatMessage & { + type: "message"; + role: "user" | "assistant"; + metadata?: { compacted?: unknown }; +} { + return ( + typeof event === "object" && + event !== null && + "type" in event && + (event as { type?: unknown }).type === "message" && + "role" in event + ); +} + +async function waitForChatJsonlTmpFile(sessionDir: string, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + + const hasTmp = async (): Promise => { + const names = await fs.readdir(sessionDir).catch(() => [] as string[]); + return names.some((name) => name.startsWith("chat.jsonl.")); + }; + + if (await hasTmp()) { + return; + } + + await new Promise((resolve, reject) => { + let settled = false; + + const finish = (error?: Error) => { + if (settled) return; + settled = true; + clearInterval(poller); + clearTimeout(timeout); + try { + watcher.close(); + } catch { + // ignore + } + if (error) { + reject(error); + } else { + resolve(); + } + }; + + const timeout = setTimeout( + () => { + finish(new Error(`Timed out waiting for write-file-atomic tmpfile in ${sessionDir}`)); + }, + Math.max(0, deadline - Date.now()) + ); + + // Polling + fs.watch: watch is fast when it works; polling is the fallback. + const poller = setInterval(() => { + void hasTmp().then((ok) => { + if (ok) finish(); + }); + }, 5); + + const watcher = fsSync.watch(sessionDir, { persistent: false }, (_event, filename) => { + if (filename && filename.startsWith("chat.jsonl.")) { + finish(); + return; + } + void hasTmp().then((ok) => { + if (ok) finish(); + }); + }); + }); +} + +describeIntegration("compaction durability", () => { + test.concurrent( + "should not lose history if process is SIGKILLed during atomic replaceChatHistory", + async () => { + if (process.platform === "win32") { + // SIGKILL isn't supported on Windows. + return; + } + + const tempGitRepo = await createTempGitRepo(); + const env1 = await createTestEnvironment(); + + try { + const branchName = generateBranchName("atomic-replace"); + const createResult = await createWorkspace(env1, tempGitRepo, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) { + throw new Error(createResult.error); + } + + const workspaceId = createResult.metadata.id; + if (!workspaceId) { + throw new Error("Workspace ID not returned from creation"); + } + + // Seed history (no LLM): create a small, valid chat.jsonl we can later replay via ORPC. + const historyService = new HistoryService(env1.config); + const seededMessages = [ + createMuxMessage("seed-1", "user", "hello", {}), + createMuxMessage("seed-2", "assistant", "hi", {}), + createMuxMessage("seed-3", "user", "how are you", {}), + createMuxMessage("seed-4", "assistant", "fine", {}), + ]; + + for (const msg of seededMessages) { + const result = await historyService.appendToHistory(workspaceId, msg); + expect(result.success).toBe(true); + if (!result.success) { + throw new Error(result.error); + } + } + + // Stop the in-process backend, but keep rootDir on disk. + await env1.services.dispose(); + await env1.services.shutdown(); + + const rootDir = env1.tempDir; + const sessionDir = path.join(rootDir, "sessions", workspaceId); + const workerScript = path.join(__dirname, "workers", "replaceChatHistoryWorker.ts"); + + const worker = spawn("bun", [workerScript], { + cwd: process.cwd(), + env: { + ...process.env, + MUX_TEST_ROOT_DIR: rootDir, + MUX_TEST_WORKSPACE_ID: workspaceId, + // Keep the atomic write busy long enough to observe the tmp file. + MUX_TEST_SUMMARY_BYTES: "50000000", + }, + stdio: ["ignore", "ignore", "pipe"], + }); + + let workerStderr = ""; + worker.stderr?.on("data", (chunk) => { + workerStderr += chunk.toString(); + }); + + const exited = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>( + (resolve) => { + worker.once("exit", (code, signal) => resolve({ code, signal })); + } + ); + + const race = await Promise.race([ + waitForChatJsonlTmpFile(sessionDir, 20000).then(() => "tmp" as const), + exited.then(() => "exit" as const), + ]); + + if (race === "tmp") { + // Simulate an abrupt crash. + worker.kill("SIGKILL"); + } + + const exitInfo = await exited; + if (exitInfo.code !== 0 && exitInfo.signal !== "SIGKILL") { + throw new Error( + `Worker failed: code=${exitInfo.code} signal=${exitInfo.signal}\n${workerStderr}` + ); + } + + // Restart backend and assert behaviorally via ORPC history replay. + const env2 = await createTestEnvironmentFromRootDir(rootDir); + try { + const replay = await env2.orpc.workspace.getFullReplay({ workspaceId }); + const replayedMessages = replay.filter(isChatMessage); + + expect([seededMessages.length, 1]).toContain(replayedMessages.length); + + if (replayedMessages.length === 1) { + expect(replayedMessages[0].metadata?.compacted).toBeTruthy(); + } + } finally { + // Best-effort: remove workspace before tearing down rootDir. + try { + await env2.orpc.workspace.remove({ workspaceId, options: { force: true } }); + } catch { + // ignore + } + await cleanupTestEnvironment(env2); + } + } finally { + await cleanupTempGitRepo(tempGitRepo); + } + }, + 60000 + ); +}); diff --git a/tests/ipc/compactionRestartRetry.test.ts b/tests/ipc/compactionRestartRetry.test.ts new file mode 100644 index 0000000000..d43ac5d950 --- /dev/null +++ b/tests/ipc/compactionRestartRetry.test.ts @@ -0,0 +1,261 @@ +import type { WorkspaceChatMessage } from "@/common/orpc/types"; +import { buildCompactionPrompt } from "../../src/common/constants/ui"; +import type { MuxFrontendMetadata } from "../../src/common/types/message"; +import { createMuxMessage } from "../../src/common/types/message"; +import { HistoryService } from "../../src/node/services/historyService"; +import { + createTestEnvironment, + createTestEnvironmentFromRootDir, + cleanupTestEnvironment, + setupProviders, + shouldRunIntegrationTests, + validateApiKeys, + getApiKey, +} from "./setup"; +import { + cleanupTempGitRepo, + createTempGitRepo, + createWorkspace, + createStreamCollector, + generateBranchName, + resolveOrpcClient, + waitFor, + configureTestRetries, +} from "./helpers"; + +// Skip all tests if TEST_INTEGRATION is not set +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +// Validate API keys before running tests +if (shouldRunIntegrationTests()) { + validateApiKeys(["ANTHROPIC_API_KEY"]); +} + +const PROVIDER = "anthropic"; +const MODEL = "anthropic:claude-haiku-4-5"; + +describeIntegration("compaction restart retry", () => { + configureTestRetries(3); + + test.concurrent( + "should re-run compaction after restart and still auto-send continueMessage", + async () => { + const tempGitRepo = await createTempGitRepo(); + const env1 = await createTestEnvironment(); + let env2: Awaited> | null = null; + let workspaceId: string | undefined; + + try { + // Provider setup (persists into config.json under env1.tempDir) + await setupProviders(env1, { + [PROVIDER]: { + apiKey: getApiKey("ANTHROPIC_API_KEY"), + }, + }); + + const branchName = generateBranchName("compaction-restart"); + const createResult = await createWorkspace(env1, tempGitRepo, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) { + throw new Error(createResult.error); + } + + workspaceId = createResult.metadata.id; + if (!workspaceId) { + throw new Error("Workspace ID not returned from creation"); + } + + // Seed some history without LLM calls. + const historyService = new HistoryService(env1.config); + const seedMessages = [ + createMuxMessage("seed-u1", "user", "We are testing compaction.", {}), + createMuxMessage("seed-a1", "assistant", "Acknowledged.", {}), + createMuxMessage("seed-u2", "user", "We will compact and then continue.", {}), + createMuxMessage("seed-a2", "assistant", "Understood.", {}), + ]; + for (const msg of seedMessages) { + const result = await historyService.appendToHistory(workspaceId, msg); + expect(result.success).toBe(true); + if (!result.success) { + throw new Error(result.error); + } + } + + // Send a compaction request with a queued continue message. + const continueText = "Reply with the single word BANANA and nothing else."; + const muxMetadata: MuxFrontendMetadata = { + type: "compaction-request", + rawCommand: `/compact\n${continueText}`, + parsed: { + model: MODEL, + maxOutputTokens: 256, + continueMessage: { + text: continueText, + }, + }, + }; + + const collector1 = createStreamCollector(env1.orpc, workspaceId); + collector1.start(); + await collector1.waitForSubscription(5000); + + const compactionPrompt = buildCompactionPrompt(200); + const sendResult = await env1.orpc.workspace.sendMessage({ + workspaceId, + message: compactionPrompt, + options: { + model: MODEL, + mode: "compact", + maxOutputTokens: 256, + toolPolicy: [{ regex_match: ".*", action: "disable" }], + muxMetadata, + }, + }); + expect(sendResult.success).toBe(true); + + // Wait for stream to start. + const streamStart = await collector1.waitForEvent("stream-start", 20000); + expect(streamStart).toBeDefined(); + + // Capture the compaction-request message id (persisted user message). + const gotCompactionRequestId = await waitFor(() => { + const userMsg = collector1 + .getEvents() + .find( + (e: WorkspaceChatMessage) => + "type" in e && + e.type === "message" && + "role" in e && + e.role === "user" && + (e as { metadata?: { muxMetadata?: { type?: string } } }).metadata?.muxMetadata + ?.type === "compaction-request" + ) as (WorkspaceChatMessage & { id?: string }) | undefined; + return Boolean(userMsg?.id); + }, 5000); + expect(gotCompactionRequestId).toBe(true); + + const compactionRequestMsg = collector1 + .getEvents() + .find( + (e: WorkspaceChatMessage) => + "type" in e && + e.type === "message" && + "role" in e && + e.role === "user" && + (e as { metadata?: { muxMetadata?: { type?: string } } }).metadata?.muxMetadata + ?.type === "compaction-request" + ) as WorkspaceChatMessage & { id: string }; + + const compactionRequestId = compactionRequestMsg.id; + expect(compactionRequestId).toBeTruthy(); + + // Crash-like interruption: force a stream error while compaction is running. + const client1 = resolveOrpcClient(env1); + const triggered = await client1.debug.triggerStreamError({ + workspaceId, + errorMessage: "Test-triggered compaction stream error", + }); + expect(triggered).toBe(true); + + const streamError = await collector1.waitForEvent("stream-error", 20000); + expect(streamError).toBeDefined(); + collector1.stop(); + + // Simulate backend restart (preserve rootDir on disk). + await env1.services.dispose(); + await env1.services.shutdown(); + + env2 = await createTestEnvironmentFromRootDir(env1.tempDir); + + const collector2 = createStreamCollector(env2.orpc, workspaceId); + collector2.start(); + await collector2.waitForSubscription(5000); + + // Mimic the renderer's recovery action: retry compaction via sendMessage(editMessageId) + // so that continueMessage is queued again after restart. + const retryResult = await env2.orpc.workspace.sendMessage({ + workspaceId, + message: compactionPrompt, + options: { + model: MODEL, + mode: "compact", + maxOutputTokens: 256, + toolPolicy: [{ regex_match: ".*", action: "disable" }], + muxMetadata, + editMessageId: compactionRequestId, + }, + }); + expect(retryResult.success).toBe(true); + + // Behavioral assertions via the real chat event stream: + // 1) Compaction completes (we see an assistant summary with metadata.compacted) + const sawCompactedSummary = await waitFor(() => { + return collector2.getEvents().some((e: WorkspaceChatMessage) => { + return ( + "type" in e && + e.type === "message" && + "role" in e && + e.role === "assistant" && + Boolean((e as { metadata?: { compacted?: unknown } }).metadata?.compacted) + ); + }); + }, 60000); + expect(sawCompactedSummary).toBe(true); + + // 2) The queued continue message is auto-sent (we see the user message) + const sawContinueUserMessage = await waitFor(() => { + return collector2.getEvents().some((e: WorkspaceChatMessage) => { + if (!("type" in e) || e.type !== "message" || !("role" in e) || e.role !== "user") { + return false; + } + const parts = (e as { parts?: Array<{ type: string; text?: string }> }).parts; + const text = + parts + ?.filter((p) => p.type === "text") + .map((p) => p.text ?? "") + .join("") ?? ""; + return text.includes(continueText); + }); + }, 60000); + expect(sawContinueUserMessage).toBe(true); + + // 3) The follow-up assistant stream completes with BANANA + const sawBanana = await waitFor(() => { + return collector2.getEvents().some((e: WorkspaceChatMessage) => { + if (!("type" in e) || e.type !== "stream-end") { + return false; + } + const parts = (e as { parts?: Array<{ type: string; text?: string }> }).parts; + const text = + parts + ?.filter((p) => p.type === "text") + .map((p) => p.text ?? "") + .join("") ?? ""; + return text.includes("BANANA"); + }); + }, 60000); + expect(sawBanana).toBe(true); + + collector2.stop(); + } finally { + if (env2) { + if (workspaceId) { + try { + // Best-effort: remove workspace to stop MCP servers and clean up worktrees/sessions. + await env2.orpc.workspace.remove({ workspaceId, options: { force: true } }); + } catch { + // ignore + } + } + + await cleanupTestEnvironment(env2); + } else { + await cleanupTestEnvironment(env1); + } + + await cleanupTempGitRepo(tempGitRepo); + } + }, + 240000 + ); +}); diff --git a/tests/ipc/setup.ts b/tests/ipc/setup.ts index 9237bfde27..eb931950ce 100644 --- a/tests/ipc/setup.ts +++ b/tests/ipc/setup.ts @@ -102,6 +102,58 @@ export async function createTestEnvironment(): Promise { }; } +/** + * Create a test environment using an existing mux root directory. + * Useful for simulating an app restart while preserving on-disk state. + */ +export async function createTestEnvironmentFromRootDir(rootDir: string): Promise { + const config = new Config(rootDir); + + // Create mock BrowserWindow + const mockWindow = createMockBrowserWindow(); + + // Create ServiceContainer instance + const services = new ServiceContainer(config); + await services.initialize(); + + // Wire services to the mock BrowserWindow + // Note: Events are consumed via ORPC subscriptions (StreamCollector), not windowService.send() + services.windowService.setMainWindow(mockWindow); + + const orpcContext: ORPCContext = { + config: services.config, + aiService: services.aiService, + projectService: services.projectService, + workspaceService: services.workspaceService, + taskService: services.taskService, + providerService: services.providerService, + terminalService: services.terminalService, + editorService: services.editorService, + windowService: services.windowService, + updateService: services.updateService, + tokenizerService: services.tokenizerService, + serverService: services.serverService, + featureFlagService: services.featureFlagService, + sessionTimingService: services.sessionTimingService, + mcpConfigService: services.mcpConfigService, + mcpServerManager: services.mcpServerManager, + menuEventService: services.menuEventService, + voiceService: services.voiceService, + experimentsService: services.experimentsService, + telemetryService: services.telemetryService, + sessionUsageService: services.sessionUsageService, + }; + const orpc = createOrpcTestClient(orpcContext); + + return { + config, + services, + mockWindow, + tempDir: rootDir, + orpc, + }; +} + /** * Cleanup test environment (remove temporary directory) with retry logic */ diff --git a/tests/ipc/workers/replaceChatHistoryWorker.ts b/tests/ipc/workers/replaceChatHistoryWorker.ts new file mode 100644 index 0000000000..85f198d59e --- /dev/null +++ b/tests/ipc/workers/replaceChatHistoryWorker.ts @@ -0,0 +1,107 @@ +import type { BrowserWindow, WebContents } from "electron"; +import { Config } from "../../../src/node/config"; +import type { ORPCContext } from "../../../src/node/orpc/context"; +import { ServiceContainer } from "../../../src/node/services/serviceContainer"; +import { createMuxMessage } from "../../../src/common/types/message"; +import { createOrpcTestClient } from "../orpcTestClient"; + +function createMockBrowserWindow(): BrowserWindow { + const mockWindow = { + webContents: { + send: () => undefined, + openDevTools: () => undefined, + } as unknown as WebContents, + isDestroyed: () => false, + isMinimized: () => false, + restore: () => undefined, + focus: () => undefined, + loadURL: () => undefined, + on: () => undefined, + setTitle: () => undefined, + } as unknown as BrowserWindow; + + return mockWindow; +} + +async function main(): Promise { + const rootDir = process.env.MUX_TEST_ROOT_DIR; + const workspaceId = process.env.MUX_TEST_WORKSPACE_ID; + const summaryBytesRaw = process.env.MUX_TEST_SUMMARY_BYTES ?? "20000000"; + + if (!rootDir) { + throw new Error("MUX_TEST_ROOT_DIR is required"); + } + if (!workspaceId) { + throw new Error("MUX_TEST_WORKSPACE_ID is required"); + } + + const summaryBytes = Number(summaryBytesRaw); + if (!Number.isFinite(summaryBytes) || summaryBytes <= 0) { + throw new Error(`Invalid MUX_TEST_SUMMARY_BYTES: ${summaryBytesRaw}`); + } + + const config = new Config(rootDir); + const services = new ServiceContainer(config); + await services.initialize(); + + services.windowService.setMainWindow(createMockBrowserWindow()); + + const orpcContext: ORPCContext = { + config: services.config, + aiService: services.aiService, + projectService: services.projectService, + workspaceService: services.workspaceService, + taskService: services.taskService, + providerService: services.providerService, + terminalService: services.terminalService, + editorService: services.editorService, + windowService: services.windowService, + updateService: services.updateService, + tokenizerService: services.tokenizerService, + serverService: services.serverService, + featureFlagService: services.featureFlagService, + sessionTimingService: services.sessionTimingService, + mcpConfigService: services.mcpConfigService, + mcpServerManager: services.mcpServerManager, + menuEventService: services.menuEventService, + voiceService: services.voiceService, + experimentsService: services.experimentsService, + telemetryService: services.telemetryService, + sessionUsageService: services.sessionUsageService, + }; + + const client = createOrpcTestClient(orpcContext); + + // Huge payload to keep the write-file-atomic temp file around long enough + // for the parent Jest process to observe and SIGKILL us. + const summaryText = "X".repeat(summaryBytes); + + const summaryMessage = createMuxMessage( + `compaction-summary-${Date.now()}`, + "assistant", + summaryText, + { + compacted: "user", + } + ); + + const result = await client.workspace.replaceChatHistory({ + workspaceId, + summaryMessage, + }); + + if (!result.success) { + throw new Error(`replaceChatHistory failed: ${result.error}`); + } + + // Best-effort cleanup (won't run under SIGKILL). + await services.dispose(); + await services.shutdown(); +} + +void main().catch((error) => { + const message = error instanceof Error ? (error.stack ?? error.message) : String(error); + // eslint-disable-next-line no-console + console.error(message); + process.exitCode = 1; +});