diff --git a/src/commands/ai/generate/server/AIGenerateServerCommand.ts b/src/commands/ai/generate/server/AIGenerateServerCommand.ts index e1daa4167..d14884e23 100644 --- a/src/commands/ai/generate/server/AIGenerateServerCommand.ts +++ b/src/commands/ai/generate/server/AIGenerateServerCommand.ts @@ -38,7 +38,6 @@ export class AIGenerateServerCommand extends AIGenerateCommand { // Mode selection: RAG context building OR direct messages if (params.roomId) { // RAG MODE: Build context from chat room (SAME code path as PersonaUser) - console.log(`🤖 AI Generate: Building RAG context for room ${params.roomId.slice(0, 8)}...`); // Find persona if not specified let targetPersonaId = params.personaId; @@ -57,7 +56,6 @@ export class AIGenerateServerCommand extends AIGenerateCommand { const personaRecord = usersResult.data[0]; targetPersonaId = personaRecord.id; personaDisplayName = personaRecord.data.displayName; - console.log(`✅ AI Generate: Using persona "${personaRecord.data.displayName}" (${targetPersonaId.slice(0, 8)})`); } // Build RAG context (SAME code as PersonaUser.respondToMessage line 207-215) @@ -134,7 +132,6 @@ export class AIGenerateServerCommand extends AIGenerateCommand { } else if (params.messages) { // DIRECT MODE: Use provided messages - console.log(`🤖 AI Generate: Using provided messages (${params.messages.length})...`); request = paramsToRequest(params); } else { @@ -143,7 +140,6 @@ export class AIGenerateServerCommand extends AIGenerateCommand { // PREVIEW MODE: Return request without calling LLM if (params.preview) { - console.log(`👁️ AI Generate: Preview mode - returning request without LLM call`); const formatted = this.formatRequestPreview(request, ragContext); return createAIGenerateResultFromParams(params, { @@ -156,15 +152,9 @@ export class AIGenerateServerCommand extends AIGenerateCommand { } // GENERATION MODE: Call AIProviderDaemon - console.log(`🤖 AI Generate: Calling LLM with ${request.messages.length} messages...`); const response = await AIProviderDaemon.generateText(request); - - const result = responseToResult(response, params); - console.log(`✅ AI Generate: Generated ${result.usage?.outputTokens} tokens in ${result.responseTimeMs}ms`); - - return result; + return responseToResult(response, params); } catch (error) { - console.error(`❌ AI Generate: Execution failed:`, error); return createErrorResult(params, error instanceof Error ? error.message : String(error)); } } diff --git a/src/daemons/ai-provider-daemon/server/AdapterHealthMonitor.ts b/src/daemons/ai-provider-daemon/server/AdapterHealthMonitor.ts index 3addc4071..71e84a77e 100644 --- a/src/daemons/ai-provider-daemon/server/AdapterHealthMonitor.ts +++ b/src/daemons/ai-provider-daemon/server/AdapterHealthMonitor.ts @@ -173,6 +173,25 @@ export class AdapterHealthMonitor { await Promise.allSettled(checks); } + /** + * Compute per-adapter check interval with exponential backoff. + * Healthy adapters use base interval. Failed adapters back off exponentially + * up to a ceiling, so they still recover when network returns. + * + * Backoff schedule (with 30s base): + * 0 failures → 30s + * 1 failure → 60s + * 2 failures → 120s + * 3 failures → 240s (4 min) + * 5+ failures → 300s (5 min ceiling) + */ + private adapterCheckInterval(state: AdapterHealthState, baseInterval: number): number { + if (state.consecutiveFailures === 0) return baseInterval; + const backoff = baseInterval * Math.pow(2, state.consecutiveFailures); + const ceiling = 5 * 60 * 1000; // 5-minute max — ensures recovery from transient network issues + return Math.min(backoff, ceiling); + } + /** * Check a single adapter's health * Concurrent-safe with per-adapter lock @@ -188,8 +207,11 @@ export class AdapterHealthMonitor { return; } + // Exponential backoff for failing adapters (still retries at ceiling for recovery) + const effectiveInterval = this.adapterCheckInterval(state, checkInterval); + // Check if enough time has passed since last check - if (now - state.lastCheckTime < checkInterval) { + if (now - state.lastCheckTime < effectiveInterval) { return; // Too soon for this adapter } @@ -211,7 +233,12 @@ export class AdapterHealthMonitor { state.consecutiveFailures = 0; } else { state.consecutiveFailures++; - log.warn(`⚠️ ${state.adapter.providerId}: Health check failed (${state.consecutiveFailures} consecutive failures)`); + const nextRetryMs = this.adapterCheckInterval(state, checkInterval); + const nextRetrySec = Math.round(nextRetryMs / 1000); + // Only log on first failure and at backoff milestones (powers of 2) to reduce noise + if (state.consecutiveFailures === 1 || (state.consecutiveFailures & (state.consecutiveFailures - 1)) === 0) { + log.warn(`⚠️ ${state.adapter.providerId}: Health check failed (${state.consecutiveFailures} consecutive, next retry in ${nextRetrySec}s)`); + } // Get max failures threshold from SystemDaemon const systemDaemon = SystemDaemon.sharedInstance(); diff --git a/src/daemons/ai-provider-daemon/shared/adapters/BaseOpenAICompatibleAdapter.ts b/src/daemons/ai-provider-daemon/shared/adapters/BaseOpenAICompatibleAdapter.ts index b90153cab..bb4b836c3 100644 --- a/src/daemons/ai-provider-daemon/shared/adapters/BaseOpenAICompatibleAdapter.ts +++ b/src/daemons/ai-provider-daemon/shared/adapters/BaseOpenAICompatibleAdapter.ts @@ -151,6 +151,10 @@ export abstract class BaseOpenAICompatibleAdapter extends BaseAIProviderAdapter protected readonly config: OpenAICompatibleConfig; protected isInitialized = false; + // Throttle per-status log messages (avoid spamming same error every call) + private _lastStatusLogTime: Map = new Map(); + private readonly _statusLogThrottleMs = 5 * 60 * 1000; // 5 minutes + constructor(config: OpenAICompatibleConfig) { super(); this.config = config; @@ -731,7 +735,13 @@ export abstract class BaseOpenAICompatibleAdapter extends BaseAIProviderAdapter timestamp: Date.now(), }); - this.log(null, 'error', `💰 ${this.providerName}: ${status} - ${errorBody.slice(0, 200)}`); + // Throttle log to once per 5 minutes per status (avoid spamming same error) + const now = Date.now(); + const lastLog = this._lastStatusLogTime.get(status) ?? 0; + if (now - lastLog >= this._statusLogThrottleMs) { + this._lastStatusLogTime.set(status, now); + this.log(null, 'error', `💰 ${this.providerName}: ${status} - ${errorBody.slice(0, 200)}`); + } } throw new Error(`HTTP ${response.status}: ${errorBody}`); diff --git a/src/daemons/data-daemon/server/DatabaseHandleRegistry.ts b/src/daemons/data-daemon/server/DatabaseHandleRegistry.ts index 8d6876baf..d72cc3234 100644 --- a/src/daemons/data-daemon/server/DatabaseHandleRegistry.ts +++ b/src/daemons/data-daemon/server/DatabaseHandleRegistry.ts @@ -161,7 +161,6 @@ export class DatabaseHandleRegistry { // Initialize default handle metadata const expandedDbPath = getDatabasePath(); - console.log(`📦 DatabaseHandleRegistry: Path registry initialized (default db: ${expandedDbPath})`); this.handleMetadata.set(DEFAULT_HANDLE, { adapter: 'rust' as AdapterType, // All I/O goes through Rust @@ -218,7 +217,6 @@ export class DatabaseHandleRegistry { throw new Error('SQLite config requires either "path" or "filename" property'); } // Just register the path - Rust handles actual connections - console.log(`📦 DatabaseHandleRegistry: Registered handle ${handle.substring(0, 8)}... → ${dbPath}`); break; } diff --git a/src/daemons/data-daemon/server/ORMRustClient.ts b/src/daemons/data-daemon/server/ORMRustClient.ts index 7be481d5b..8c442ca25 100644 --- a/src/daemons/data-daemon/server/ORMRustClient.ts +++ b/src/daemons/data-daemon/server/ORMRustClient.ts @@ -143,7 +143,6 @@ export class ORMRustClient { this.socket.on('connect', () => { this.connected = true; this.connecting = false; - console.log('[ORMRustClient] Connected to continuum-core'); resolve(); }); @@ -223,8 +222,8 @@ export class ORMRustClient { const networkAndRustMs = totalMs - timing.stringifyMs - timing.writeMs - parseMs; this.pendingTimings.delete(response.requestId); - // Log slow operations (>50ms threshold matches Rust) - if (totalMs > 50) { + // Log slow operations (>1000ms — raised from 50ms to reduce startup noise) + if (totalMs > 1000) { console.warn(`[ORMRustClient] SLOW IPC: ${timing.command} total=${totalMs}ms (stringify=${timing.stringifyMs}ms write=${timing.writeMs}ms network+rust=${networkAndRustMs}ms parse=${parseMs}ms)`); } } diff --git a/src/daemons/data-daemon/shared/ORMLogger.ts b/src/daemons/data-daemon/shared/ORMLogger.ts index 6017c6a6e..746a70230 100644 --- a/src/daemons/data-daemon/shared/ORMLogger.ts +++ b/src/daemons/data-daemon/shared/ORMLogger.ts @@ -84,8 +84,8 @@ export function logOperationStart( console.log(`[ORM] ${operation} ${collection} completed in ${durationMs}ms`); } - // Warn on slow operations - if (durationMs > 100) { + // Warn on slow operations (>1000ms — raised from 100ms to reduce startup noise) + if (durationMs > 1000) { console.warn(`[ORM] SLOW: ${operation} ${collection} took ${durationMs}ms`); } }; diff --git a/src/daemons/events-daemon/browser/EventsDaemonBrowser.ts b/src/daemons/events-daemon/browser/EventsDaemonBrowser.ts index 4ec7b81fa..8beaa9fd6 100644 --- a/src/daemons/events-daemon/browser/EventsDaemonBrowser.ts +++ b/src/daemons/events-daemon/browser/EventsDaemonBrowser.ts @@ -32,15 +32,16 @@ export class EventsDaemonBrowser extends EventsDaemon implements IEventSubscript private domEventBridge: DOMEventBridge; private subscriptionManager = new EventSubscriptionManager(); + /** + * Registry of event names with active DOM listeners. + * DOM CustomEvent dispatch is skipped for events not in this set. + * Widgets register via registerDOMInterest() when they need document-level events. + */ + private static _domInterest = new Set(); + constructor(context: JTAGContext, router: JTAGRouter) { super(context, router); - // Reduce log spam - debug logs removed - // console.log(`🔥 CLAUDE-BROWSER-DAEMON-DEBUG-${Date.now()}: EventsDaemonBrowser constructor called!`); - // console.log(`🔥 Context: ${context.environment}/${context.uuid}`); - // console.log(`🔥 ENDPOINT-DEBUG: EventsDaemonBrowser.subpath = "${this.subpath}"`); - // console.log(`🔥 ENDPOINT-DEBUG: Expected browser endpoint should be "browser/${this.subpath}"`); - // Setup DOM event bridge for widget communication this.domEventBridge = new DOMEventBridge(this.eventManager); verbose() && console.log('🌉 EventsDaemonBrowser: DOM event bridge initialized'); @@ -56,33 +57,70 @@ export class EventsDaemonBrowser extends EventsDaemon implements IEventSubscript } /** - * Handle local event bridging - emit to event system AND DOM for BaseWidget + * Register interest in receiving DOM CustomEvents for a specific event name. + * Only events with registered interest will be dispatched to the document. + * Returns an unregister function. */ - protected handleLocalEventBridge(eventName: string, eventData: unknown): void { - // 1. Emit to local event system - DOMEventBridge will automatically handle DOM dispatch - this.eventManager.events.emit(eventName, eventData); + public static registerDOMInterest(eventName: string): () => void { + EventsDaemonBrowser._domInterest.add(eventName); + return () => { + EventsDaemonBrowser._domInterest.delete(eventName); + }; + } - // 2. Dispatch DOM event for BaseWidget integration (backward compatibility) - const domEvent = new CustomEvent(eventName, { - detail: eventData - }); + /** + * Check if anything has registered DOM interest for this event name. + * Checks both: + * - Events.domInterest (populated by Events.subscribe() in browser) + * - _domInterest (populated by registerDOMInterest() from BaseWidget/WidgetEventServiceBrowser) + * Uses prefix matching: 'data:chat_messages' matches 'data:chat_messages:created'. + */ + private hasDOMInterest(eventName: string): boolean { + // Direct match in either registry + if (Events.domInterest.has(eventName)) return true; + if (EventsDaemonBrowser._domInterest.has(eventName)) return true; - // Type-safe document access for browser environment - if (typeof globalThis !== 'undefined' && 'document' in globalThis) { - (globalThis as typeof globalThis & { document: Document }).document.dispatchEvent(domEvent); + // Prefix match against both registries + for (const interest of Events.domInterest) { + if (eventName.startsWith(interest + ':') || interest.startsWith(eventName + ':')) return true; } + for (const interest of EventsDaemonBrowser._domInterest) { + if (eventName.startsWith(interest + ':') || interest.startsWith(eventName + ':')) return true; + } + return false; + } + + /** + * Handle local event bridging - emit to event system AND DOM for BaseWidget + * + * Dispatch order: + * 1. Internal EventEmitter (DOMEventBridge handles mapped events) + * 2. SubscriptionManager (exact, wildcard, elegant pattern matching) + * 3. Legacy wildcard subscriptions + * 4. DOM CustomEvent ONLY if a widget registered interest (filter-first, not spam-then-filter) + */ + protected handleLocalEventBridge(eventName: string, eventData: unknown): void { + // 1. Emit to local event system — DOMEventBridge handles its mapped events + this.eventManager.events.emit(eventName, eventData); - // 3. Trigger unified subscription manager (NEW!) - // This handles exact, wildcard, and elegant pattern subscriptions + // 2. Trigger unified subscription manager (exact, wildcard, and elegant patterns) this.subscriptionManager.trigger(eventName, eventData); - // 4. Legacy: Also check wildcard subscriptions from Events.subscribe() - // TODO: Migrate to unified subscription manager + // 3. Legacy: Also check wildcard subscriptions from Events.subscribe() try { Events.checkWildcardSubscriptions(eventName, eventData); } catch (error) { console.error('Failed to check wildcard subscriptions:', error); } + + // 4. DOM dispatch — ONLY if a widget registered interest for this event namespace + // This prevents creating DOM CustomEvents for high-frequency events no widget cares about + if (this.hasDOMInterest(eventName)) { + if (typeof globalThis !== 'undefined' && 'document' in globalThis) { + const domEvent = new CustomEvent(eventName, { detail: eventData }); + (globalThis as typeof globalThis & { document: Document }).document.dispatchEvent(domEvent); + } + } } /** diff --git a/src/daemons/events-daemon/shared/EventsDaemon.ts b/src/daemons/events-daemon/shared/EventsDaemon.ts index 9964bc682..4ee1a744c 100644 --- a/src/daemons/events-daemon/shared/EventsDaemon.ts +++ b/src/daemons/events-daemon/shared/EventsDaemon.ts @@ -17,9 +17,9 @@ import { DaemonBase } from '../../command-daemon/shared/DaemonBase'; class EventRateLimiter { private counts = new Map(); private windowStart = Date.now(); - private readonly windowMs = 100; // 100ms window - private readonly maxPerWindow = 20; // Max 20 of same event per window - private readonly warnThreshold = 10; // Warn at 10+ per window + private readonly windowMs = 1000; // 1-second window (matches Rust-side rate limiter) + private readonly maxPerWindow = 200; // Max 200 of same event per second + private readonly warnThreshold = 100; // Warn at 100+ per second private blocked = new Set(); private warned = new Set(); // Track warned events to avoid spam @@ -40,7 +40,7 @@ class EventRateLimiter { .sort((a, b) => b[1] - a[1]); if (hotEvents.length > 0) { - console.warn(`⚠️ EVENT ACTIVITY: ${hotEvents.map(([e, c]) => `${e}(${c})`).join(', ')}`); + console.warn(`[EventRateLimiter] EVENT ACTIVITY: ${hotEvents.map(([e, c]) => `${e}(${c})`).join(', ')}`); } } @@ -63,7 +63,7 @@ class EventRateLimiter { if (count === this.warnThreshold && !this.warned.has(eventName)) { this.warned.add(eventName); this.totalWarned++; - console.warn(`⚠️ EVENT TRENDING: "${eventName}" at ${count}x in ${this.windowMs}ms (blocking at ${this.maxPerWindow})`); + console.warn(`[EventRateLimiter] EVENT TRENDING: "${eventName}" at ${count}x in ${this.windowMs}ms (blocking at ${this.maxPerWindow})`); } // Block if over threshold @@ -75,7 +75,7 @@ class EventRateLimiter { if (this.blockedHistory.length > 100) { this.blockedHistory.shift(); } - console.error(`🛑 EVENT CASCADE BLOCKED: "${eventName}" fired ${count}x in ${this.windowMs}ms`); + console.error(`[EventRateLimiter] EVENT CASCADE BLOCKED: "${eventName}" fired ${count}x in ${this.windowMs}ms`); return true; } diff --git a/src/daemons/session-daemon/server/SessionDaemonServer.ts b/src/daemons/session-daemon/server/SessionDaemonServer.ts index 619dcc6e0..735cd3b7f 100644 --- a/src/daemons/session-daemon/server/SessionDaemonServer.ts +++ b/src/daemons/session-daemon/server/SessionDaemonServer.ts @@ -24,6 +24,7 @@ import { UserStateEntity } from '../../../system/data/entities/UserStateEntity'; import { UserIdentityResolver } from '../../../system/user/shared/UserIdentityResolver'; import { Logger } from '../../../system/core/logging/Logger'; import { SystemPaths } from '../../../system/core/config/SystemPaths'; +import { UserEntityCache } from '../../../system/user/server/UserEntityCache'; import { type SessionMetadata, type CreateSessionParams, @@ -413,20 +414,15 @@ export class SessionDaemonServer extends SessionDaemon { * Load existing user (citizen) by ID */ private async getUserById(userId: UUID): Promise { - // DEBUG: Always log getUserById calls to trace identity bugs - BYPASS LOGGER - console.error(`🔍🔍🔍 getUserById CALLED: userId=${JSON.stringify(userId)}, type=${typeof userId}, value=${userId}`); - this.log.info(`🔍 getUserById: userId=${JSON.stringify(userId)}, type=${typeof userId}, stringified=${String(userId)}`); - - // CRITICAL: Validate userId is not the string "undefined" (indicates serialization bug upstream) + // Validate userId is not the string "undefined" (indicates serialization bug upstream) if (!userId || userId === 'undefined' || (userId as any) === undefined) { - console.error(`❌❌❌ getUserById VALIDATION FAILED: userId=${userId}`); - this.log.error(`❌ getUserById called with invalid userId`); + this.log.error(`getUserById called with invalid userId: ${userId}`); this.log.error(`Stack trace:`, new Error().stack); throw new Error(`Invalid userId: ${userId} - this indicates a bug in session creation or identity resolution`); } - // Load UserEntity from database - const userEntity = await ORM.read(COLLECTIONS.USERS, userId); + // Load UserEntity from cache (eliminates redundant ORM reads) + const userEntity = await UserEntityCache.instance.read(userId); if (!userEntity) { throw new Error(`User ${userId} not found in database`); } @@ -476,51 +472,32 @@ export class SessionDaemonServer extends SessionDaemon { * This avoids hardcoding names and works with any seeded owner. */ private async findSeededHumanOwner(): Promise { - console.error(`🔍🔍🔍 findSeededHumanOwner: Starting search...`); - // Look for all human users const result = await ORM.query({ collection: COLLECTIONS.USERS, filter: { type: 'human' } }); - console.error(`🔍🔍🔍 findSeededHumanOwner: Query result - success=${result.success}, hasData=${!!result.data}, count=${result.data?.length || 0}`); - if (!result.success || !result.data || result.data.length === 0) { - console.error(`🔍🔍🔍 findSeededHumanOwner: No human users found, returning null`); return null; } - // Debug: log all human users - console.error(`🔍🔍🔍 findSeededHumanOwner: Found ${result.data.length} human users:`); - result.data.forEach((record, i) => { - console.error(` [${i}] uniqueId="${record.data.uniqueId}", id="${record.id}", recordId="${record.data.id}", startsWithAnon=${record.data.uniqueId?.startsWith('anon-')}`); - }); - // Find the first non-anonymous human (uniqueId doesn't start with "anon-") - // DataRecord wraps entity in .data property - // CRITICAL FIX: Use record.id instead of record.data.id (DataRecord issue) + // DataRecord wraps entity in .data property — use record.id, not entity.id const seededOwner = result.data.find(record => { const entity = record.data; - const matches = entity.uniqueId && !entity.uniqueId.startsWith('anon-') && record.id; // Use record.id, not entity.id! - console.error(`🔍🔍🔍 findSeededHumanOwner: Checking uniqueId="${entity.uniqueId}", record.id="${record.id}", matches=${matches}`); - return matches; + return entity.uniqueId && !entity.uniqueId.startsWith('anon-') && record.id; }); - console.error(`🔍🔍🔍 findSeededHumanOwner: Found seededOwner=${seededOwner ? seededOwner.data.uniqueId : 'null'}`); - if (!seededOwner) { - console.error(`🔍🔍🔍 findSeededHumanOwner: No non-anonymous human found, returning null`); return null; } - // CRITICAL FIX: Use record.id instead of record.data.id if (!seededOwner.id) { - console.error(`❌ findSeededHumanOwner: Found seeded owner but record.id is undefined! uniqueId=${seededOwner.data.uniqueId}`); + this.log.error(`findSeededHumanOwner: Found seeded owner but record.id is undefined! uniqueId=${seededOwner.data.uniqueId}`); return null; } - console.error(`🔍🔍🔍 findSeededHumanOwner: Loading user with id=${seededOwner.id}`); return await this.getUserById(seededOwner.id); } @@ -711,54 +688,38 @@ export class SessionDaemonServer extends SessionDaemon { case 'browser-ui': { // Browser identity: Use deviceId to find/create user // Server is source of truth - browser doesn't send userId - console.error(`🌐🌐🌐 BROWSER-UI CASE ENTERED`); - this.log.info(`🌐 Browser-ui session: resolving human identity from deviceId`); + this.log.info(`Browser-ui session: resolving human identity from deviceId`); const deviceId = identity?.deviceId; - console.error(`🌐🌐🌐 deviceId: ${deviceId}`); if (deviceId) { - console.error(`🌐🌐🌐 deviceId EXISTS, calling findUserByDeviceId...`); // Look for existing user associated with this device const existingUser = await this.findUserByDeviceId(deviceId); - console.error(`🌐🌐🌐 findUserByDeviceId returned: ${existingUser ? existingUser.displayName : 'null'}`); if (existingUser) { user = existingUser; - console.error(`🌐🌐🌐 ASSIGNED user from existingUser: ${user.displayName}`); - this.log.info(`✅ Found existing user for device: ${user.displayName} (${user.id.slice(0, 8)}...)`); + this.log.info(`Found existing user for device: ${user.displayName} (${user.id.slice(0, 8)}...)`); } else { - console.error(`🌐🌐🌐 No existingUser, checking for seeded owner...`); // New device - check for seeded owner (human without anon- prefix) const seededOwner = await this.findSeededHumanOwner(); - console.error(`🌐🌐🌐 findSeededHumanOwner returned: ${seededOwner ? seededOwner.displayName : 'null'}`); if (seededOwner) { user = seededOwner; - console.error(`🌐🌐🌐 ASSIGNED user from seededOwner: ${user.displayName}`); - this.log.info(`✅ Associating new device with seeded owner: ${user.displayName}`); + this.log.info(`Associating new device with seeded owner: ${user.displayName}`); } else { - console.error(`🌐🌐🌐 No seededOwner, creating anonymous human...`); - this.log.info(`📝 New device ${deviceId.slice(0, 12)}... - creating anonymous human`); + this.log.info(`New device ${deviceId.slice(0, 12)}... - creating anonymous human`); user = await this.createAnonymousHuman(params, deviceId); - console.error(`🌐🌐🌐 ASSIGNED user from createAnonymousHuman: ${user.displayName}`); } } } else { - console.error(`🌐🌐🌐 NO deviceId, checking for seeded owner...`); // No deviceId - check for seeded owner first const seededOwner = await this.findSeededHumanOwner(); - console.error(`🌐🌐🌐 findSeededHumanOwner returned: ${seededOwner ? seededOwner.displayName : 'null'}`); if (seededOwner) { user = seededOwner; - console.error(`🌐🌐🌐 ASSIGNED user from seededOwner (no deviceId): ${user.displayName}`); - this.log.info(`✅ Using seeded owner: ${user.displayName} (no deviceId)`); + this.log.info(`Using seeded owner: ${user.displayName} (no deviceId)`); } else { - console.error(`🌐🌐🌐 No seededOwner (no deviceId), creating anonymous human...`); - this.log.info(`📝 No deviceId - creating anonymous human`); + this.log.info(`No deviceId - creating anonymous human`); user = await this.createAnonymousHuman(params, undefined); - console.error(`🌐🌐🌐 ASSIGNED user from createAnonymousHuman (no deviceId): ${user.displayName}`); } } - console.error(`🌐🌐🌐 BROWSER-UI CASE COMPLETE, user: ${user ? user.displayName : 'UNDEFINED!!!'}`); break; } diff --git a/src/daemons/user-daemon/server/UserDaemonServer.ts b/src/daemons/user-daemon/server/UserDaemonServer.ts index d71ff4ab4..21f9e29f1 100644 --- a/src/daemons/user-daemon/server/UserDaemonServer.ts +++ b/src/daemons/user-daemon/server/UserDaemonServer.ts @@ -23,6 +23,7 @@ import { JTAGClientServer } from '../../../system/core/client/server/JTAGClientS import { AIDecisionLogger } from '../../../system/ai/server/AIDecisionLogger'; import { Logger, type ComponentLogger } from '../../../system/core/logging/Logger'; import { SystemPaths } from '../../../system/core/config/SystemPaths'; +import { UserEntityCache } from '../../../system/user/server/UserEntityCache'; export class UserDaemonServer extends UserDaemon { private static instance: UserDaemonServer | null = null; @@ -237,10 +238,12 @@ export class UserDaemonServer extends UserDaemon { this.log.info(`🔧 UserDaemon: Found ${personas.length} personas to initialize`); - // Ensure each persona has correct state - for (const persona of personas) { - this.log.info(`🔧 UserDaemon: Processing persona: ${persona.displayName}`); - await this.ensurePersonaCorrectState(persona); + // Batched parallel initialization — 6 concurrent to avoid thundering herd on DB/Rust + const BATCH_SIZE = 6; + for (let i = 0; i < personas.length; i += BATCH_SIZE) { + const batch = personas.slice(i, i + BATCH_SIZE); + this.log.info(`🔧 UserDaemon: Initializing batch ${Math.floor(i / BATCH_SIZE) + 1}/${Math.ceil(personas.length / BATCH_SIZE)} (${batch.map(p => p.displayName).join(', ')})`); + await Promise.all(batch.map(p => this.ensurePersonaCorrectState(p))); } this.log.info(`✅ UserDaemon: ensurePersonaClients() complete - processed ${personas.length} personas`); @@ -386,10 +389,10 @@ export class UserDaemonServer extends UserDaemon { * Start continuous monitoring loops (using base class interval management) */ protected startMonitoringLoops(): boolean { - // User monitoring loop - every 5 seconds + // User monitoring loop - every 30 seconds (was 5s — caused 134+ redundant ORM reads) this.registerInterval('user-monitoring', async () => { await this.runUserMonitoringLoop(); - }, 5000); + }, 30000); // State reconciliation loop - every 30 seconds this.registerInterval('state-reconciliation', async () => { @@ -428,6 +431,9 @@ export class UserDaemonServer extends UserDaemon { id: r.id } as UserEntity)); + // Populate UserEntityCache — eliminates redundant ORM reads for CallerDetector et al. + UserEntityCache.instance.setAll(users); + // Check each user for (const user of users) { if (user.type === 'persona') { diff --git a/src/generated-command-schemas.json b/src/generated-command-schemas.json index c1e04b7eb..f6ea63aab 100644 --- a/src/generated-command-schemas.json +++ b/src/generated-command-schemas.json @@ -1,5 +1,5 @@ { - "generated": "2026-02-18T23:08:26.754Z", + "generated": "2026-02-19T04:38:24.449Z", "version": "1.0.0", "commands": [ { diff --git a/src/package-lock.json b/src/package-lock.json index 83a688a2c..aa28cc90c 100644 --- a/src/package-lock.json +++ b/src/package-lock.json @@ -1,12 +1,12 @@ { "name": "@continuum/jtag", - "version": "1.0.8074", + "version": "1.0.8082", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@continuum/jtag", - "version": "1.0.8074", + "version": "1.0.8082", "hasInstallScript": true, "license": "MIT", "dependencies": { diff --git a/src/package.json b/src/package.json index ede9b802e..d38b49d37 100644 --- a/src/package.json +++ b/src/package.json @@ -1,6 +1,6 @@ { "name": "@continuum/jtag", - "version": "1.0.8074", + "version": "1.0.8082", "description": "Global CLI debugging system for any Node.js project. Install once globally, use anywhere: npm install -g @continuum/jtag", "config": { "active_example": "widget-ui", diff --git a/src/shared/ipc/logger/LoggerMessageTypes.ts b/src/shared/ipc/logger/LoggerMessageTypes.ts index 2c7d3e11d..7e73c9b82 100644 --- a/src/shared/ipc/logger/LoggerMessageTypes.ts +++ b/src/shared/ipc/logger/LoggerMessageTypes.ts @@ -43,6 +43,21 @@ export interface WriteLogResult { bytesWritten: number; } +/** + * Payload for log/write-batch requests. + * Sends multiple log entries in a single IPC call. + */ +export interface WriteLogBatchPayload { + entries: WriteLogPayload[]; +} + +/** + * Result of log/write-batch command. + */ +export interface WriteLogBatchResult { + entriesQueued: number; +} + /** * Payload for flush-logs requests. */ diff --git a/src/shared/ipc/logger/LoggerWorkerClient.ts b/src/shared/ipc/logger/LoggerWorkerClient.ts index 4db453672..c98104be0 100644 --- a/src/shared/ipc/logger/LoggerWorkerClient.ts +++ b/src/shared/ipc/logger/LoggerWorkerClient.ts @@ -30,6 +30,8 @@ import { WorkerClient, WorkerClientConfig } from '../WorkerClient.js'; import { WriteLogPayload, WriteLogResult, + WriteLogBatchPayload, + WriteLogBatchResult, FlushLogsPayload, FlushLogsResult, PingPayload, @@ -45,8 +47,8 @@ import { * Type-safe client for Logger Rust worker. */ export class LoggerWorkerClient extends WorkerClient< - WriteLogPayload | FlushLogsPayload | PingPayload, - WriteLogResult | FlushLogsResult | PingResult + WriteLogPayload | WriteLogBatchPayload | FlushLogsPayload | PingPayload, + WriteLogResult | WriteLogBatchResult | FlushLogsResult | PingResult > { constructor(config: WorkerClientConfig | string) { // Allow simple socket path string or full config @@ -204,19 +206,19 @@ export class LoggerWorkerClient extends WorkerClient< // ============================================================================ /** - * Write multiple log messages in batch (future enhancement). + * Write multiple log messages in a single IPC call. * - * NOTE: Currently sends messages individually. A future optimization would - * be to add a 'write-logs-batch' message type to the Rust worker. + * Sends one log/write-batch command instead of N individual log/write calls, + * reducing IPC overhead by ~100x during high-throughput logging. * * @param payloads - Array of log messages to write - * @returns Promise resolving to array of results + * @returns Promise resolving to batch result */ async writeLogsBatch( payloads: WriteLogPayload[] - ): Promise { - const promises = payloads.map(payload => this.writeLog(payload)); - return Promise.all(promises); + ): Promise { + const response = await this.send('log/write-batch', { entries: payloads } as WriteLogBatchPayload); + return response.payload as WriteLogBatchResult; } } diff --git a/src/shared/version.ts b/src/shared/version.ts index edc0651d9..40c1e8178 100644 --- a/src/shared/version.ts +++ b/src/shared/version.ts @@ -3,5 +3,5 @@ * DO NOT EDIT MANUALLY */ -export const VERSION = '1.0.8074'; +export const VERSION = '1.0.8082'; export const PACKAGE_NAME = '@continuum/jtag'; diff --git a/src/system/core/client/server/JTAGClientServer.ts b/src/system/core/client/server/JTAGClientServer.ts index da3f87c44..3833e9129 100644 --- a/src/system/core/client/server/JTAGClientServer.ts +++ b/src/system/core/client/server/JTAGClientServer.ts @@ -70,26 +70,21 @@ export class JTAGClientServer extends JTAGClient { * Server clients don't need session storage - base class handles session management * Base class already updates this._session which is used by sessionId getter */ - protected updateClientSessionStorage(sessionId: UUID): void { + protected updateClientSessionStorage(_sessionId: UUID): void { // No-op for server clients - session already updated by base class - console.log(`🏷️ JTAGClientServer: Session updated to: ${sessionId} (managed by base class)`); } - - protected async getLocalSystem(): Promise { - // FIXED: Never auto-create systems - only connect to existing ones - // This prevents server clients from automatically creating new JTAG systems - // when they should connect to existing systems (like test-bench on port WS_PORT) - console.log(`🔍 getLocalSystem() CHECK at ${new Date().toISOString()}: instance=${JTAGSystemServer.instance ? 'EXISTS' : 'NULL'}`); + protected async getLocalSystem(): Promise { + // Never auto-create systems - only connect to existing ones. + // This prevents server clients from creating new JTAG systems + // when they should connect to existing ones. // Only return existing instance if it's already running in same process if (JTAGSystemServer.instance) { - console.log('🏠 JTAGClientServer: Found existing local system instance'); return JTAGSystemServer.instance; } // Force remote connection for all other cases - console.log('🌐 JTAGClientServer: No local system - using remote connection'); return null; } diff --git a/src/system/core/client/shared/JTAGClient.ts b/src/system/core/client/shared/JTAGClient.ts index 175b4705e..b306e5d2f 100644 --- a/src/system/core/client/shared/JTAGClient.ts +++ b/src/system/core/client/shared/JTAGClient.ts @@ -870,6 +870,10 @@ export abstract class JTAGClient extends JTAGBase implements ITransportHandler { await this.connectionBroker.shutdown(); verbose('✅ JTAGClient: Connection Broker shut down'); } + + // Cleanup ResponseCorrelator — stops periodic cleanup timer and rejects orphaned requests + this.responseCorrelator.destroy(); + verbose('✅ JTAGClient: ResponseCorrelator destroyed'); } /** diff --git a/src/system/core/logging/ComponentLogger.ts b/src/system/core/logging/ComponentLogger.ts index ec7225500..888fed19f 100644 --- a/src/system/core/logging/ComponentLogger.ts +++ b/src/system/core/logging/ComponentLogger.ts @@ -17,19 +17,12 @@ import type { LoggerConfig, LogCategory } from './LoggerTypes'; import { LogLevel } from './LoggerTypes'; import type { LogLevel as WorkerLogLevel } from '../../../shared/ipc/logger/LoggerMessageTypes'; import { LogLevelRegistry } from './LogLevelRegistry'; +import type { LogBatcher } from './LogBatcher'; /** Interface for the parent logger (to avoid circular imports) */ export interface ParentLogger { queueMessage(logFile: string, message: string): void; - workerClient: { - writeLog(entry: { - category: string; - level: WorkerLogLevel; - component: string; - message: string; - args?: any[]; - }): Promise; // Returns WriteLogResult but we don't use it - } | null; + logBatcher: LogBatcher | null; useRustLogger: boolean; logDir: string; } @@ -74,9 +67,9 @@ export class ComponentLogger { } } - // File output - route to Rust worker OR TypeScript queue + // File output - route to Rust worker (via LogBatcher) OR TypeScript queue if (this.parentLogger && this.logFilePath) { - if (this.parentLogger.useRustLogger && this.parentLogger.workerClient) { + if (this.parentLogger.useRustLogger && this.parentLogger.logBatcher) { this.sendToWorker(level as WorkerLogLevel, message, args, timestamp); } else { const formattedArgs = args.length > 0 @@ -127,15 +120,13 @@ export class ComponentLogger { } } - if (this.parentLogger.workerClient) { - this.parentLogger.workerClient.writeLog({ + if (this.parentLogger.logBatcher) { + this.parentLogger.logBatcher.queue({ category, level, component: this.component, message, args: args.length > 0 ? args : undefined - }).catch((err) => { - console.error(`[Logger] Rust worker write failed for ${this.component}:`, err.message); }); } } diff --git a/src/system/core/logging/LogBatcher.ts b/src/system/core/logging/LogBatcher.ts new file mode 100644 index 000000000..bbdae66e8 --- /dev/null +++ b/src/system/core/logging/LogBatcher.ts @@ -0,0 +1,68 @@ +/** + * LogBatcher - Batches log messages before sending to Rust worker + * + * Reduces IPC calls from ~1000/sec to ~10/sec by buffering WriteLogPayload + * entries and flushing them as a single batch command. + * + * Flush triggers: + * - Every 100ms (timer) + * - When 50 messages are queued (capacity) + * + * All sends are fire-and-forget — log delivery is best-effort. + */ + +import type { WriteLogPayload } from '../../../shared/ipc/logger/LoggerMessageTypes'; +import type { LoggerWorkerClient } from '../../../shared/ipc/logger/LoggerWorkerClient'; + +export class LogBatcher { + private _buffer: WriteLogPayload[] = []; + private _flushTimer: ReturnType | null = null; + private readonly _flushIntervalMs = 100; + private readonly _maxBatchSize = 50; + private _destroyed = false; + + constructor(private readonly _workerClient: LoggerWorkerClient) {} + + /** + * Queue a log payload for batched delivery. + * Never blocks — worst case the buffer grows until next flush. + */ + queue(payload: WriteLogPayload): void { + if (this._destroyed) return; + + this._buffer.push(payload); + + if (this._buffer.length >= this._maxBatchSize) { + this.flush(); + } else if (!this._flushTimer) { + this._flushTimer = setTimeout(() => this.flush(), this._flushIntervalMs); + } + } + + /** + * Flush all buffered messages to the Rust worker as a single batch. + * Fire-and-forget — errors are silently dropped (logging errors in a logger = infinite loop). + */ + flush(): void { + if (this._flushTimer) { + clearTimeout(this._flushTimer); + this._flushTimer = null; + } + + if (this._buffer.length === 0) return; + + const batch = this._buffer; + this._buffer = []; + + // Single IPC call for entire batch + this._workerClient.writeLogsBatch(batch).catch(() => {}); + } + + /** + * Flush remaining messages and stop accepting new ones. + */ + destroy(): void { + this._destroyed = true; + this.flush(); + } +} diff --git a/src/system/core/logging/Logger.ts b/src/system/core/logging/Logger.ts index 933d9883b..ba0e37318 100644 --- a/src/system/core/logging/Logger.ts +++ b/src/system/core/logging/Logger.ts @@ -53,6 +53,7 @@ import { LogLevel, FileMode, createLoggerConfig, parseFileMode } from './LoggerT import type { LoggerConfig, LogCategory } from './LoggerTypes'; import { inferCategory } from './CategoryInference'; import { ComponentLogger, type ParentLogger } from './ComponentLogger'; +import { LogBatcher } from './LogBatcher'; import { LogLevelRegistry } from './LogLevelRegistry'; // Re-export types for consumers @@ -100,7 +101,8 @@ class LoggerClass implements ParentLogger { private readonly MAX_QUEUE_SIZE = 1000; // Max buffered messages // ParentLogger interface - public for ComponentLogger access - public workerClient: LoggerWorkerClient | null = null; + private workerClient: LoggerWorkerClient | null = null; + public logBatcher: LogBatcher | null = null; public useRustLogger: boolean = USE_RUST_LOGGER; public logDir: string; @@ -152,6 +154,8 @@ class LoggerClass implements ParentLogger { if (this.config.enableConsoleLogging) { console.log('🦀 [Logger] Connected to continuum-core LoggerModule'); } + // Initialize batcher now that connection is live + this.logBatcher = new LogBatcher(this.workerClient!); }) .catch((err) => { console.error('⚠️⚠️⚠️ [Logger] CONTINUUM-CORE CONNECTION FAILED - FALLING BACK TO TYPESCRIPT LOGGING ⚠️⚠️⚠️'); @@ -159,6 +163,7 @@ class LoggerClass implements ParentLogger { console.error('⚠️ [Logger] Error:', err.message); console.error('⚠️ [Logger] To start workers: npm run worker:start'); this.workerClient = null; + this.logBatcher = null; }); } @@ -398,6 +403,10 @@ class LoggerClass implements ParentLogger { this.logQueues.clear(); this.logTimers.clear(); + if (this.logBatcher) { + this.logBatcher.destroy(); + } + if (this.workerClient) { this.workerClient.disconnect(); } diff --git a/src/system/core/shared/Events.ts b/src/system/core/shared/Events.ts index 3dd213655..39628f491 100644 --- a/src/system/core/shared/Events.ts +++ b/src/system/core/shared/Events.ts @@ -44,6 +44,13 @@ export interface EventEmitOptions { * auto-handles scoping. Developer just provides event + data = magic! */ export class Events { + /** + * Registry of event names with active DOM listeners. + * Used by EventsDaemonBrowser to filter DOM dispatch — only events in this set get dispatched. + * Events.subscribe() registers here automatically in browser environment. + */ + static readonly domInterest = new Set(); + /** * ✨ Universal event emission - works in server, browser, shared code * @@ -366,9 +373,12 @@ export class Events { document.addEventListener(patternOrEventName, eventHandler); + // Register DOM interest so EventsDaemonBrowser dispatches this event to DOM + this.domInterest.add(patternOrEventName); + return () => { document.removeEventListener(patternOrEventName, eventHandler); - //console.log(`🔌 Events: Unsubscribed from ${patternOrEventName}`); + this.domInterest.delete(patternOrEventName); }; } else { // Server environment - store exact-match subscriptions in map diff --git a/src/system/core/shared/ResponseCorrelator.ts b/src/system/core/shared/ResponseCorrelator.ts index 2582fef42..46199089c 100644 --- a/src/system/core/shared/ResponseCorrelator.ts +++ b/src/system/core/shared/ResponseCorrelator.ts @@ -23,9 +23,12 @@ export interface CorrelatorStatus { export class ResponseCorrelator { private pendingRequests = new Map(); private defaultTimeoutMs: number; + private cleanupTimer?: ReturnType; constructor(defaultTimeoutMs: number = 30000) { this.defaultTimeoutMs = defaultTimeoutMs; + // Periodic sweep for orphaned requests (every 60s) + this.cleanupTimer = setInterval(() => this.cleanup(), 60_000); } /** @@ -148,4 +151,16 @@ export class ResponseCorrelator { get pendingCount(): number { return this.pendingRequests.size; } + + /** + * Stop cleanup timer and reject all pending requests. + * Call on disconnect or shutdown to prevent orphaned timers. + */ + destroy(): void { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + this.cleanupTimer = undefined; + } + this.rejectAll('Correlator destroyed'); + } } \ No newline at end of file diff --git a/src/system/core/shared/TimingHarness.ts b/src/system/core/shared/TimingHarness.ts index 355ff6a6b..b4bb6c690 100644 --- a/src/system/core/shared/TimingHarness.ts +++ b/src/system/core/shared/TimingHarness.ts @@ -335,8 +335,9 @@ export class TimingCollector { } } - // Console debug log for slow operations (>500ms — raised from 100ms to reduce spam) - if (timing.totalMs > 500) { + // Console debug log for slow operations (>500ms, excluding fire-and-forget operations) + // log/write-batch has a 10s timeout by design — not a real slowness indicator + if (timing.totalMs > 500 && !timing.operation.includes('log/write-batch')) { const phases = Object.entries(timing.phases) .map(([k, v]) => `${k}=${(v / 1000).toFixed(1)}ms`) .join(', '); diff --git a/src/system/resources/shared/ResourceManager.ts b/src/system/resources/shared/ResourceManager.ts index 3392322f2..794278bb9 100644 --- a/src/system/resources/shared/ResourceManager.ts +++ b/src/system/resources/shared/ResourceManager.ts @@ -38,7 +38,7 @@ export class ResourceManager { availableMemory: 16384 }; - console.log('🔧 ResourceManager: Initialized with moderator:', this.moderator.constructor.name); + // ResourceManager initialized with moderator } /** @@ -72,7 +72,7 @@ export class ResourceManager { lastActivityTime: Date.now() }); - console.log(`🔧 ResourceManager: Registered adapter ${displayName} (GPU: ${quota}MB, Workers: ${maxWorkers})`); + // Adapter registered (logged per-persona during init — not actionable) } /** @@ -85,7 +85,7 @@ export class ResourceManager { this.systemResources.availableGpuMemory += adapter.gpuMemoryUsed; this.systemResources.activeWorkers -= adapter.activeWorkers; this.adapters.delete(adapterId); - console.log(`🔧 ResourceManager: Unregistered adapter ${adapter.displayName}`); + // Adapter unregistered } } diff --git a/src/system/user/server/CallerDetector.ts b/src/system/user/server/CallerDetector.ts index b4249cdf4..2d8d35546 100644 --- a/src/system/user/server/CallerDetector.ts +++ b/src/system/user/server/CallerDetector.ts @@ -9,9 +9,8 @@ import type { JTAGContext, CallerType, CallerCapabilities } from '../core/types/JTAGTypes'; import type { UUID } from '../core/types/CrossPlatformUUID'; -import { ORM } from '../../daemons/data-daemon/server/ORM'; -import { COLLECTIONS } from '../data/config/DatabaseConfig'; import type { UserEntity } from '../data/entities/UserEntity'; +import { UserEntityCache } from './UserEntityCache'; /** * Detect caller type from JTAGContext and userId @@ -35,9 +34,9 @@ export async function detectCallerType(context: JTAGContext, userId: UUID): Prom return context.callerType; } - // 2. Look up user by userId + // 2. Look up user by userId (cached — eliminates ~134 ORM reads/session) try { - const user = await ORM.read(COLLECTIONS.USERS, userId); + const user = await UserEntityCache.instance.read(userId); if (!user) { console.warn(`CallerDetector: User not found for userId=${userId}, defaulting to 'script'`); @@ -77,7 +76,7 @@ export async function detectCallerType(context: JTAGContext, userId: UUID): Prom */ export async function getCallerCapabilities(userId: UUID): Promise { try { - const user = await ORM.read(COLLECTIONS.USERS, userId); + const user = await UserEntityCache.instance.read(userId); if (!user) { console.warn(`CallerDetector: User not found for userId=${userId}, returning default capabilities`); diff --git a/src/system/user/server/PersonaUser.ts b/src/system/user/server/PersonaUser.ts index 355a01f4d..3aa26ed5c 100644 --- a/src/system/user/server/PersonaUser.ts +++ b/src/system/user/server/PersonaUser.ts @@ -675,63 +675,71 @@ export class PersonaUser extends AIUser { // This enables fast-path decisions (<1ms) for should-respond, priority, deduplication // Also wires the bridge to inbox for Rust-backed channel routing try { + // Phase A: Rust bridge must init first — everything else depends on it await this._rustCognition?.initialize(); if (this._rustCognition) { this.inbox.setRustBridge(this._rustCognition); } this.log.info(`🦀 ${this.displayName}: Rust cognition bridge connected (inbox routing enabled)`); - // Sync rate limiter config to Rust (mirrors TS RateLimiter config) + // Phase B: These are independent of each other — run in parallel + // - Rate limiter sync (~5ms) + // - Adapter sync + genome sync (~20-50ms) + // - Corpus ORM query (~100-500ms, I/O bound) if (this._rustCognition) { - const rlConfig = this.rateLimiter.getConfig(); - await this._rustCognition.configureRateLimiter( - rlConfig.minSecondsBetweenResponses, - rlConfig.maxResponsesPerSession - ); - this.log.info(`🦀 ${this.displayName}: Rate limiter synced to Rust (min=${rlConfig.minSecondsBetweenResponses}s, max=${rlConfig.maxResponsesPerSession})`); - } - - // Sync genome adapter registry to Rust for model selection - if (this._rustCognition && this.memory?.genome) { - const adapters = this.memory.genome.getAllAdapters().map(a => ({ - name: a.getName(), - domain: a.getDomain(), - ollama_model_name: a.getTrainedModelName() ?? undefined, - is_loaded: a.isLoaded(), - is_current: a === this.memory!.genome.getCurrentAdapter(), - priority: a.getPriority(), - })); - if (adapters.length > 0) { - await this._rustCognition.syncAdapters(adapters as any); - this.log.info(`🦀 ${this.displayName}: ${adapters.length} adapters synced to Rust for model selection`); + const parallelTasks: Promise[] = []; + + // Task 1: Sync rate limiter config to Rust + parallelTasks.push((async () => { + const rlConfig = this.rateLimiter.getConfig(); + await this._rustCognition!.configureRateLimiter( + rlConfig.minSecondsBetweenResponses, + rlConfig.maxResponsesPerSession + ); + this.log.info(`🦀 ${this.displayName}: Rate limiter synced to Rust (min=${rlConfig.minSecondsBetweenResponses}s, max=${rlConfig.maxResponsesPerSession})`); + })()); + + // Task 2: Sync genome adapters to Rust for model selection + LRU eviction + if (this.memory?.genome) { + parallelTasks.push((async () => { + const adapters = this.memory!.genome.getAllAdapters().map(a => ({ + name: a.getName(), + domain: a.getDomain(), + ollama_model_name: a.getTrainedModelName() ?? undefined, + is_loaded: a.isLoaded(), + is_current: a === this.memory!.genome.getCurrentAdapter(), + priority: a.getPriority(), + })); + if (adapters.length > 0) { + await this._rustCognition!.syncAdapters(adapters as any); + this.log.info(`🦀 ${this.displayName}: ${adapters.length} adapters synced to Rust for model selection`); + } + this.memory!.genome.setRustBridge(this._rustCognition!); + await this.memory!.genome.syncToRust(); + this.log.info(`🦀 ${this.displayName}: Genome paging engine synced to Rust`); + })()); } - // Wire Rust bridge into genome for LRU eviction decisions - this.memory.genome.setRustBridge(this._rustCognition); - await this.memory.genome.syncToRust(); - this.log.info(`🦀 ${this.displayName}: Genome paging engine synced to Rust`); + // Task 3: Load corpus from ORM (I/O bound — overlaps with sync tasks above) + // Then load into Rust compute engine for sub-millisecond 6-layer parallel recall + parallelTasks.push((async () => { + try { + const { memories, events } = await this.loadCorpusFromORM(); + const corpusResult = await this._rustCognition!.memoryLoadCorpus(memories, events); + this.log.info(`${this.displayName}: Rust corpus loaded — ${corpusResult.memory_count} memories (${corpusResult.embedded_memory_count} embedded), ${corpusResult.timeline_event_count} events (${corpusResult.embedded_event_count} embedded) in ${corpusResult.load_time_ms.toFixed(1)}ms`); + } catch (error) { + this.log.error(`${this.displayName}: Corpus load failed:`, error); + // Non-fatal — recall will return empty results until corpus is loaded + } + })()); + + await Promise.all(parallelTasks); } } catch (error) { this.log.error(`🦀 ${this.displayName}: Rust cognition init failed (messages will error):`, error); // Don't throw - let persona initialize, but message handling will fail loudly } - // STEP 1.5.2: Load memory corpus into Rust compute engine - // Bulk-loads all memories + timeline events from ORM (longterm.db) into Rust's - // in-memory corpus. This enables sub-millisecond 6-layer parallel recall. - // Data path: ORM (DataOpen/DataList) → map to Rust types → IPC → DashMap> - // Must happen AFTER bridge.initialize() and BEFORE any RAG/recall usage. - if (this._rustCognition) { - try { - const { memories, events } = await this.loadCorpusFromORM(); - const corpusResult = await this._rustCognition.memoryLoadCorpus(memories, events); - this.log.info(`${this.displayName}: Rust corpus loaded — ${corpusResult.memory_count} memories (${corpusResult.embedded_memory_count} embedded), ${corpusResult.timeline_event_count} events (${corpusResult.embedded_event_count} embedded) in ${corpusResult.load_time_ms.toFixed(1)}ms`); - } catch (error) { - this.log.error(`${this.displayName}: Corpus load failed:`, error); - // Non-fatal — recall will return empty results until corpus is loaded - } - } - // STEP 1.6: Register with ResourceManager for holistic resource allocation try { const { getResourceManager } = await import('../../resources/shared/ResourceManager.js'); diff --git a/src/system/user/server/UserEntityCache.ts b/src/system/user/server/UserEntityCache.ts new file mode 100644 index 000000000..1abba137e --- /dev/null +++ b/src/system/user/server/UserEntityCache.ts @@ -0,0 +1,105 @@ +/** + * UserEntityCache - In-memory TTL cache for UserEntity reads + * + * Eliminates redundant ORM.read(COLLECTIONS.USERS, userId) calls. + * Before this cache: + * - CallerDetector reads UserEntity on EVERY command (no cache) + * - UserDaemon monitoring loop reads ALL users every 5s + * - SessionDaemon reads user on every session lookup + * = hundreds of redundant reads/minute → IPC socket saturation → ORM timeouts + * + * After: + * - First read populates cache, subsequent reads are O(1) Map lookups + * - Bulk setAll() from monitoring loop pre-warms the cache + * - 60s TTL ensures staleness is bounded + * - invalidate() for immediate eviction on known mutations + */ + +import type { UUID } from '../../core/types/CrossPlatformUUID'; +import { ORM } from '../../../daemons/data-daemon/server/ORM'; +import { COLLECTIONS } from '../../shared/Constants'; +import type { UserEntity } from '../../data/entities/UserEntity'; + +interface CacheEntry { + entity: UserEntity; + expiresAt: number; +} + +export class UserEntityCache { + private static _instance: UserEntityCache; + + private readonly _cache = new Map(); + private readonly _ttlMs = 60_000; // 60 seconds + + static get instance(): UserEntityCache { + if (!UserEntityCache._instance) { + UserEntityCache._instance = new UserEntityCache(); + } + return UserEntityCache._instance; + } + + /** + * Get a cached UserEntity, or null if missing/expired. + */ + get(userId: UUID): UserEntity | null { + const entry = this._cache.get(userId); + if (!entry) return null; + if (Date.now() > entry.expiresAt) { + this._cache.delete(userId); + return null; + } + return entry.entity; + } + + /** + * Cache a single UserEntity. + */ + set(userId: UUID, entity: UserEntity): void { + this._cache.set(userId, { + entity, + expiresAt: Date.now() + this._ttlMs, + }); + } + + /** + * Bulk-cache an array of UserEntities (e.g., from monitoring loop query). + * Single timestamp calculation for the whole batch. + */ + setAll(entities: UserEntity[]): void { + const expiresAt = Date.now() + this._ttlMs; + for (const entity of entities) { + if (entity.id) { + this._cache.set(entity.id, { entity, expiresAt }); + } + } + } + + /** + * Invalidate a specific user (call after known mutations). + */ + invalidate(userId: UUID): void { + this._cache.delete(userId); + } + + /** + * Read-through cache: returns cached entity or fetches from ORM. + * This is the primary API for callers replacing raw ORM.read(). + */ + async read(userId: UUID): Promise { + const cached = this.get(userId); + if (cached) return cached; + + const entity = await ORM.read(COLLECTIONS.USERS, userId); + if (entity) { + this.set(userId, entity); + } + return entity; + } + + /** + * Number of cached entries (for diagnostics). + */ + get size(): number { + return this._cache.size; + } +} diff --git a/src/system/user/server/modules/PersonaAutonomousLoop.ts b/src/system/user/server/modules/PersonaAutonomousLoop.ts index 2c07d0dc4..e1dde1de1 100644 --- a/src/system/user/server/modules/PersonaAutonomousLoop.ts +++ b/src/system/user/server/modules/PersonaAutonomousLoop.ts @@ -132,11 +132,17 @@ export class PersonaAutonomousLoop { // If this is a task, update status to 'in_progress' in database (prevents re-polling) if (item.type === 'task') { - await ORM.update( - COLLECTIONS.TASKS, - item.taskId, - { status: 'in_progress', startedAt: new Date() } - ); + try { + await ORM.update( + COLLECTIONS.TASKS, + item.taskId, + { status: 'in_progress', startedAt: new Date() } + ); + } catch { + // Task was deleted between dequeue and execution — skip it + this.log(`⚠️ ${this.personaUser.displayName}: Task ${item.taskId.slice(0, 8)} vanished before execution`); + return; + } } // Activate appropriate LoRA adapter based on domain diff --git a/src/system/user/server/modules/PersonaTaskExecutor.ts b/src/system/user/server/modules/PersonaTaskExecutor.ts index 3d46c6eb1..c264eef29 100644 --- a/src/system/user/server/modules/PersonaTaskExecutor.ts +++ b/src/system/user/server/modules/PersonaTaskExecutor.ts @@ -134,22 +134,27 @@ export class PersonaTaskExecutor { // Update task in database with completion status const duration = Date.now() - startTime; - await ORM.update( - COLLECTIONS.TASKS, - task.taskId, - { - status, - completedAt: new Date(), - result: { - success: status === 'completed', - output: outcome, - error: status === 'failed' ? outcome : undefined, - metrics: { - latencyMs: duration + try { + await ORM.update( + COLLECTIONS.TASKS, + task.taskId, + { + status, + completedAt: new Date(), + result: { + success: status === 'completed', + output: outcome, + error: status === 'failed' ? outcome : undefined, + metrics: { + latencyMs: duration + } } } - } - ); + ); + } catch { + // Task was deleted between dequeue and completion — work was still done, just can't record it + this.log(`⚠️ ${this.displayName}: Task ${task.taskId.slice(0, 8)} vanished during execution (deleted externally?)`); + } // Record activity in persona state (affects energy/mood) const complexity = task.priority; // Use priority as proxy for complexity diff --git a/src/system/voice/server/AIAudioInjector.ts b/src/system/voice/server/AIAudioInjector.ts index 5a7ff562b..f1163c8c9 100644 --- a/src/system/voice/server/AIAudioInjector.ts +++ b/src/system/voice/server/AIAudioInjector.ts @@ -208,7 +208,6 @@ export class AIAudioInjector { * This needs to be fixed in VoiceSynthesizeServerCommand to include session context. */ static subscribeToTTSEvents(personaId: string, personaName: string): () => void { - console.log(`🎙️ ${personaName}: Subscribing to TTS audio events (PROTOTYPE - needs callId in events)`); // Track active injectors by call ID const activeInjectors = new Map(); diff --git a/src/widgets/browser/services/WidgetEventServiceBrowser.ts b/src/widgets/browser/services/WidgetEventServiceBrowser.ts index ef58c5ede..659e83c65 100644 --- a/src/widgets/browser/services/WidgetEventServiceBrowser.ts +++ b/src/widgets/browser/services/WidgetEventServiceBrowser.ts @@ -6,13 +6,17 @@ */ import { WidgetEventService } from '../../shared/services/events/WidgetEventService'; +import { EventsDaemonBrowser } from '../../../daemons/events-daemon/browser/EventsDaemonBrowser'; export class WidgetEventServiceBrowser extends WidgetEventService { - // Inherits serviceName from base class - + // Track DOM listeners for proper cleanup (document.addEventListener persists across renders) + private _domListeners = new Map(); + // Track DOM interest unregister functions for EventsDaemonBrowser scope filtering + private _domInterestCleanups: Array<() => void> = []; + // Override DOM event coordination for browser environment emitCustomEvent(eventName: string, detail: any): void { - const customEvent = new CustomEvent(eventName, { + const customEvent = new CustomEvent(eventName, { detail: { ...detail, source: this.context?.widgetId || 'unknown', @@ -24,26 +28,46 @@ export class WidgetEventServiceBrowser extends WidgetEventService { // Emit on document for global listening document.dispatchEvent(customEvent); - + // Also handle locally if we have handlers this.handleCustomEvent(eventName, customEvent); - + console.debug(`🎯 WidgetEventServiceBrowser: Emitted DOM custom event '${eventName}'`); } subscribeToCustomEvents(eventName: string, handler: (event: CustomEvent) => void): void { if (!this.customEventHandlers.has(eventName)) { this.customEventHandlers.set(eventName, []); - - // Add DOM listener on first subscription - document.addEventListener(eventName, (event) => { + + // Store reference for cleanup — document listeners persist, so we must track them + const listener = (event: Event) => { this.handleCustomEvent(eventName, event as CustomEvent); - }); + }; + this._domListeners.set(eventName, listener); + document.addEventListener(eventName, listener); + + // Register DOM interest so EventsDaemonBrowser knows to dispatch this event to DOM + this._domInterestCleanups.push(EventsDaemonBrowser.registerDOMInterest(eventName)); } - + this.customEventHandlers.get(eventName)!.push(handler); console.debug(`🎯 WidgetEventServiceBrowser: Subscribed to DOM custom event '${eventName}'`); } - // Browser implementation inherits all protected methods and properties + async cleanup(): Promise { + // Remove all document-level DOM listeners (prevents memory leak across widget lifecycle) + for (const [eventName, listener] of this._domListeners) { + document.removeEventListener(eventName, listener); + } + this._domListeners.clear(); + + // Unregister DOM interest from EventsDaemonBrowser scope filtering + for (const unregister of this._domInterestCleanups) { + unregister(); + } + this._domInterestCleanups = []; + + // Call base class cleanup (clears handler maps) + await super.cleanup(); + } } \ No newline at end of file diff --git a/src/widgets/shared/BaseContentWidget.ts b/src/widgets/shared/BaseContentWidget.ts index 184fcf841..2531cc274 100644 --- a/src/widgets/shared/BaseContentWidget.ts +++ b/src/widgets/shared/BaseContentWidget.ts @@ -7,6 +7,7 @@ */ import { BaseWidget, type WidgetConfig } from './BaseWidget'; +import { EventsDaemonBrowser } from '../../daemons/events-daemon/browser/EventsDaemonBrowser'; import type { UUID } from '../../system/core/types/CrossPlatformUUID'; import type { ContentItem, @@ -291,6 +292,11 @@ export abstract class BaseContentWidget extends BaseWidget { private setupContentEventListeners(): void { this.verbose() && console.log(`🎯 ${this.config.widgetName}: Setting up content event listeners...`); + // Register DOM interest for content events (filter-first pattern) + EventsDaemonBrowser.registerDOMInterest('content:switched'); + EventsDaemonBrowser.registerDOMInterest('content:opened'); + EventsDaemonBrowser.registerDOMInterest('content:closed'); + // Listen for global content events document.addEventListener('content:switched', (event: Event) => { const customEvent = event as Event & { detail: ContentEventData }; diff --git a/src/widgets/shared/BaseWidget.ts b/src/widgets/shared/BaseWidget.ts index 86ddbf1cb..20d2a3d31 100644 --- a/src/widgets/shared/BaseWidget.ts +++ b/src/widgets/shared/BaseWidget.ts @@ -26,6 +26,7 @@ import { WIDGET_DEFAULTS} from './WidgetConstants'; import type { CommandErrorResponse, CommandResponse, CommandSuccessResponse } from '../../daemons/command-daemon/shared/CommandResponseTypes'; import { Commands } from '../../system/core/shared/Commands'; import { FILE_COMMANDS } from '../../commands/file/shared/FileCommandConstants'; +import { EventsDaemonBrowser } from '../../daemons/events-daemon/browser/EventsDaemonBrowser'; import type { UserEntity } from '../../system/data/entities/UserEntity'; import type { UserStateEntity } from '../../system/data/entities/UserStateEntity'; import type { DataListParams, DataListResult } from '../../commands/data/list/shared/DataListTypes'; @@ -789,6 +790,9 @@ export abstract class BaseWidget extends HTMLElement { this.verbose() && console.log(`🔗 BaseWidget: Setting up event dispatcher for ${eventName}`); + // Register DOM interest so EventsDaemonBrowser knows to dispatch this event to DOM + EventsDaemonBrowser.registerDOMInterest(eventName); + // Listen for server-originated events via the JTAG event system // These events come from EventsDaemon when server emits events document.addEventListener(eventName, (event: Event) => { diff --git a/src/widgets/shared/EntityScroller.ts b/src/widgets/shared/EntityScroller.ts index c7cc7efeb..b71083540 100644 --- a/src/widgets/shared/EntityScroller.ts +++ b/src/widgets/shared/EntityScroller.ts @@ -503,7 +503,7 @@ export function createScroller( // Check for duplicate using EntityManager if (entityManager.has(entityId)) { - console.log(`🔄 EntityScroller: Updating existing entity: ${entityId}`); + // Update existing entity in-place // Update existing entity entityManager.update(entityId, entity); @@ -516,7 +516,7 @@ export function createScroller( existingElement.replaceWith(newElement); } - console.log(`🔧 EntityScroller: Updated existing entity with ID: ${entityId}`); + // Entity updated return; } @@ -541,7 +541,7 @@ export function createScroller( // Check for duplicate using EntityManager if (entityManager.has(entityId)) { - console.log(`🔄 EntityScroller: Updating existing entity: ${entityId}`); + // Update existing entity in-place // Update existing entity - no auto-scroll for updates entityManager.update(entityId, entity); @@ -554,7 +554,7 @@ export function createScroller( existingElement.replaceWith(newElement); } - console.log(`🔧 EntityScroller: Updated existing entity with ID: ${entityId}`); + // Entity updated return; } diff --git a/src/workers/continuum-core/src/modules/logger.rs b/src/workers/continuum-core/src/modules/logger.rs index 162836023..72ce336e4 100644 --- a/src/workers/continuum-core/src/modules/logger.rs +++ b/src/workers/continuum-core/src/modules/logger.rs @@ -115,6 +115,20 @@ pub struct WriteLogResult { pub bytes_written: usize, } +/// Payload for log/write-batch requests (multiple entries in one IPC call). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WriteLogBatchPayload { + pub entries: Vec, +} + +/// Result of log/write-batch command. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WriteLogBatchResult { + pub entries_queued: usize, +} + /// Result of log/ping command. #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export, export_to = "../../../shared/generated/logger/LoggerPingResult.ts")] @@ -586,6 +600,30 @@ impl LoggerModule { }) } + fn handle_write_batch(&self, params: Value) -> Result { + // Extract payload (WorkerClient nests under "payload", support both patterns) + let payload_value = if let Some(nested) = params.get("payload") { + nested.clone() + } else { + params.clone() + }; + + let batch: WriteLogBatchPayload = + serde_json::from_value(payload_value).map_err(|e| format!("Invalid batch payload: {e}"))?; + + let count = batch.entries.len(); + for entry in batch.entries { + // Queue each entry through the existing channel (writer thread handles actual I/O) + let _ = self.log_tx.try_send(entry); + } + + self.requests_processed.fetch_add(1, Ordering::Relaxed); + + CommandResult::json(&WriteLogBatchResult { + entries_queued: count, + }) + } + fn handle_ping(&self) -> Result { let active_categories = self.file_cache.lock().unwrap_or_else(|e| e.into_inner()).len(); @@ -632,6 +670,7 @@ impl ServiceModule for LoggerModule { ) -> Result { match command { "log/write" => self.handle_write(params), + "log/write-batch" => self.handle_write_batch(params), "log/ping" => self.handle_ping(), _ => Err(format!("Unknown logger command: {command}")), }