diff --git a/maintnotifications/e2e/logcollector_test.go b/maintnotifications/e2e/logcollector_test.go index ac71ce572..c6b2b6eac 100644 --- a/maintnotifications/e2e/logcollector_test.go +++ b/maintnotifications/e2e/logcollector_test.go @@ -92,22 +92,34 @@ func (tlc *TestLogCollector) DoPrint() { // MatchFunc is a slice of functions that check the logs for a specific condition // use in WaitForLogMatchFunc type MatchFunc struct { - completed atomic.Bool - F func(lstring string) bool - matches []string - found chan struct{} // channel to notify when match is found, will be closed - done func() + completed atomic.Bool + F func(lstring string) bool + matches []string + matchesMu sync.Mutex // protects matches slice + found chan struct{} // channel to notify when match is found, will be closed + done func() } func (tlc *TestLogCollector) Printf(_ context.Context, format string, v ...interface{}) { tlc.mu.Lock() - defer tlc.mu.Unlock() lstr := fmt.Sprintf(format, v...) - if len(tlc.matchFuncs) > 0 { + + // Check if there are match functions to process + // Use matchFuncsMutex to safely read matchFuncs + tlc.matchFuncsMutex.Lock() + hasMatchFuncs := len(tlc.matchFuncs) > 0 + // Create a copy of matchFuncs to avoid holding the lock while processing + matchFuncsCopy := make([]*MatchFunc, len(tlc.matchFuncs)) + copy(matchFuncsCopy, tlc.matchFuncs) + tlc.matchFuncsMutex.Unlock() + + if hasMatchFuncs { go func(lstr string) { - for _, matchFunc := range tlc.matchFuncs { + for _, matchFunc := range matchFuncsCopy { if matchFunc.F(lstr) { + matchFunc.matchesMu.Lock() matchFunc.matches = append(matchFunc.matches, lstr) + matchFunc.matchesMu.Unlock() matchFunc.done() return } @@ -118,6 +130,7 @@ func (tlc *TestLogCollector) Printf(_ context.Context, format string, v ...inter fmt.Println(lstr) } tlc.l = append(tlc.l, fmt.Sprintf(format, v...)) + tlc.mu.Unlock() } func (tlc *TestLogCollector) WaitForLogContaining(searchString string, timeout time.Duration) bool { @@ -170,7 +183,12 @@ func (tlc *TestLogCollector) WaitForLogMatchFunc(mf func(string) bool, timeout t select { case <-matchFunc.found: - return matchFunc.matches[0], true + matchFunc.matchesMu.Lock() + defer matchFunc.matchesMu.Unlock() + if len(matchFunc.matches) > 0 { + return matchFunc.matches[0], true + } + return "", false case <-time.After(timeout): return "", false } diff --git a/maintnotifications/e2e/notification_injector.go b/maintnotifications/e2e/notification_injector.go index b265d1beb..14596d2f8 100644 --- a/maintnotifications/e2e/notification_injector.go +++ b/maintnotifications/e2e/notification_injector.go @@ -23,7 +23,8 @@ type NotificationInjector interface { InjectSMIGRATED(ctx context.Context, seqID int64, hostPort string, slots ...string) error // InjectMOVING injects a MOVING notification (for standalone) - InjectMOVING(ctx context.Context, seqID int64, slot int) error + // Format: ["MOVING", seqID, timeS, endpoint] + InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error // InjectMIGRATING injects a MIGRATING notification (for standalone) InjectMIGRATING(ctx context.Context, seqID int64, slot int) error @@ -31,6 +32,12 @@ type NotificationInjector interface { // InjectMIGRATED injects a MIGRATED notification (for standalone) InjectMIGRATED(ctx context.Context, seqID int64, slot int) error + // InjectFAILING_OVER injects a FAILING_OVER notification + InjectFAILING_OVER(ctx context.Context, seqID int64) error + + // InjectFAILED_OVER injects a FAILED_OVER notification + InjectFAILED_OVER(ctx context.Context, seqID int64) error + // Start starts the injector (if needed) Start() error @@ -475,8 +482,8 @@ func (p *ProxyNotificationInjector) InjectSMIGRATED(ctx context.Context, seqID i return p.injectNotification(notification) } -func (p *ProxyNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, slot int) error { - notification := formatMovingNotification(seqID, slot) +func (p *ProxyNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error { + notification := formatMovingNotification(seqID, timeS, endpoint) return p.injectNotification(notification) } @@ -490,6 +497,16 @@ func (p *ProxyNotificationInjector) InjectMIGRATED(ctx context.Context, seqID in return p.injectNotification(notification) } +func (p *ProxyNotificationInjector) InjectFAILING_OVER(ctx context.Context, seqID int64) error { + notification := formatFailingOverNotification(seqID) + return p.injectNotification(notification) +} + +func (p *ProxyNotificationInjector) InjectFAILED_OVER(ctx context.Context, seqID int64) error { + notification := formatFailedOverNotification(seqID) + return p.injectNotification(notification) +} + func (p *ProxyNotificationInjector) injectNotification(notification string) error { url := p.apiBaseURL + "/send-to-all-clients?encoding=raw" resp, err := p.httpClient.Post(url, "application/octet-stream", strings.NewReader(notification)) @@ -541,9 +558,14 @@ func formatSMigratedNotification(seqID int64, endpoints ...string) string { return strings.Join(parts, "") } -func formatMovingNotification(seqID int64, slot int) string { - slotStr := fmt.Sprintf("%d", slot) - return fmt.Sprintf(">3\r\n$6\r\nMOVING\r\n:%d\r\n$%d\r\n%s\r\n", seqID, len(slotStr), slotStr) +func formatMovingNotification(seqID int64, timeS int64, endpoint string) string { + // Format: ["MOVING", seqID, timeS, endpoint] + if endpoint == "" { + // 3 elements: MOVING, seqID, timeS + return fmt.Sprintf(">3\r\n$6\r\nMOVING\r\n:%d\r\n:%d\r\n", seqID, timeS) + } + // 4 elements: MOVING, seqID, timeS, endpoint + return fmt.Sprintf(">4\r\n$6\r\nMOVING\r\n:%d\r\n:%d\r\n$%d\r\n%s\r\n", seqID, timeS, len(endpoint), endpoint) } func formatMigratingNotification(seqID int64, slot int) string { @@ -556,6 +578,16 @@ func formatMigratedNotification(seqID int64, slot int) string { return fmt.Sprintf(">3\r\n$8\r\nMIGRATED\r\n:%d\r\n$%d\r\n%s\r\n", seqID, len(slotStr), slotStr) } +func formatFailingOverNotification(seqID int64) string { + // Format: ["FAILING_OVER", seqID] + return fmt.Sprintf(">2\r\n$12\r\nFAILING_OVER\r\n:%d\r\n", seqID) +} + +func formatFailedOverNotification(seqID int64) string { + // Format: ["FAILED_OVER", seqID] + return fmt.Sprintf(">2\r\n$11\r\nFAILED_OVER\r\n:%d\r\n", seqID) +} + // FaultInjectorNotificationInjector implements NotificationInjector using the real fault injector type FaultInjectorNotificationInjector struct { @@ -646,9 +678,9 @@ func (f *FaultInjectorNotificationInjector) InjectSMIGRATED(ctx context.Context, return fmt.Errorf("SMIGRATED cannot be directly injected with real fault injector - it's generated when migration completes") } -func (f *FaultInjectorNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, slot int) error { - // MOVING notifications are generated during slot migration - return fmt.Errorf("MOVING cannot be directly injected with real fault injector - it's generated during migration") +func (f *FaultInjectorNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error { + // MOVING notifications are generated during bind action + return fmt.Errorf("MOVING cannot be directly injected with real fault injector - it's generated during bind action") } func (f *FaultInjectorNotificationInjector) InjectMIGRATING(ctx context.Context, seqID int64, slot int) error { @@ -667,4 +699,13 @@ func (f *FaultInjectorNotificationInjector) InjectMIGRATED(ctx context.Context, return fmt.Errorf("MIGRATED cannot be directly injected with real fault injector - it's generated when migration completes") } +func (f *FaultInjectorNotificationInjector) InjectFAILING_OVER(ctx context.Context, seqID int64) error { + // FAILING_OVER is generated automatically when failover starts + return fmt.Errorf("FAILING_OVER cannot be directly injected with real fault injector - it's generated when failover starts") +} + +func (f *FaultInjectorNotificationInjector) InjectFAILED_OVER(ctx context.Context, seqID int64) error { + // FAILED_OVER is generated automatically when failover completes + return fmt.Errorf("FAILED_OVER cannot be directly injected with real fault injector - it's generated when failover completes") +} diff --git a/maintnotifications/e2e/scenario_push_notifications_test.go b/maintnotifications/e2e/scenario_push_notifications_test.go index c907bee2c..31c9b4448 100644 --- a/maintnotifications/e2e/scenario_push_notifications_test.go +++ b/maintnotifications/e2e/scenario_push_notifications_test.go @@ -13,7 +13,8 @@ import ( "github.com/redis/go-redis/v9/maintnotifications" ) -// TestPushNotifications tests Redis Enterprise push notifications (MOVING, MIGRATING, MIGRATED) +// TestPushNotifications tests Redis Enterprise push notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) +// This test now works with BOTH the real fault injector and the proxy mock func TestPushNotifications(t *testing.T) { if os.Getenv("E2E_SCENARIO_TESTS") != "true" { t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") @@ -27,11 +28,6 @@ func TestPushNotifications(t *testing.T) { defer cleanup() t.Logf("[PUSH-NOTIFICATIONS] Created test database with bdb_id: %d (mode: %s)", bdbID, testMode.Mode) - // Skip this test if using proxy mock (requires real fault injector) - if testMode.IsProxyMock() { - t.Skip("Skipping push notifications test - requires real fault injector") - } - // Wait for database to be fully ready (mode-aware) time.Sleep(testMode.DatabaseReadyDelay) @@ -43,6 +39,9 @@ func TestPushNotifications(t *testing.T) { var found bool var status *ActionStatusResponse + var bindStatus *ActionStatusResponse + var movingNotification []interface{} + var commandsRunner2, commandsRunner3 *CommandRunner var errorsDetected = false var p = func(format string, args ...interface{}) { @@ -67,10 +66,20 @@ func TestPushNotifications(t *testing.T) { // Get endpoint config from factory (now connected to new database) endpointConfig := factory.GetConfig() - // Create fault injector - faultInjector, err := CreateTestFaultInjector() + // Create notification injector (works with both proxy mock and real FI) + injector, err := NewNotificationInjector() if err != nil { - ef("Failed to create fault injector: %v", err) + ef("Failed to create notification injector: %v", err) + } + defer injector.Stop() + + // For real fault injector, we also need the FaultInjectorClient for actions + var faultInjector *FaultInjectorClient + if !testMode.IsProxyMock() { + faultInjector, err = CreateTestFaultInjector() + if err != nil { + ef("Failed to create fault injector: %v", err) + } } minIdleConns := 5 @@ -128,16 +137,28 @@ func TestPushNotifications(t *testing.T) { }() p("Starting FAILING_OVER / FAILED_OVER notifications test...") - // Test: Trigger failover action to generate FAILING_OVER, FAILED_OVER notifications - p("Triggering failover action to generate push notifications...") - failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "failover", - Parameters: map[string]interface{}{ - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - ef("Failed to trigger failover action: %v", err) + + // Mode-aware: Proxy mock directly injects notifications, real FI triggers actions + var failoverResp *ActionResponse + if testMode.IsProxyMock() { + // Proxy mock: Directly inject FAILING_OVER notification + p("Injecting FAILING_OVER notification (proxy mock mode)...") + if err := injector.InjectFAILING_OVER(ctx, 1000); err != nil { + ef("Failed to inject FAILING_OVER: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Trigger failover action to generate FAILING_OVER, FAILED_OVER notifications + p("Triggering failover action to generate push notifications...") + failoverResp, err = faultInjector.TriggerAction(ctx, ActionRequest{ + Type: "failover", + Parameters: map[string]interface{}{ + "bdb_id": endpointConfig.BdbID, + }, + }) + if err != nil { + ef("Failed to trigger failover action: %v", err) + } } go func() { p("Waiting for FAILING_OVER notification") @@ -154,6 +175,16 @@ func TestPushNotifications(t *testing.T) { p("FAILING_OVER notification received. %v", failingOverData) seqIDToObserve = int64(failingOverData["seqID"].(float64)) connIDToObserve = uint64(failingOverData["connID"].(float64)) + + // Inject FAILED_OVER in proxy mock mode + if testMode.IsProxyMock() { + p("Injecting FAILED_OVER notification (proxy mock mode)...") + if err := injector.InjectFAILED_OVER(ctx, 1001); err != nil { + ef("Failed to inject FAILED_OVER: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } + go func() { p("Waiting for FAILED_OVER notification on conn %d with seqID %d...", connIDToObserve, seqIDToObserve+1) match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { @@ -168,27 +199,41 @@ func TestPushNotifications(t *testing.T) { failedOverData := logs2.ExtractDataFromLogMessage(match) p("FAILED_OVER notification received. %v", failedOverData) - status, err = faultInjector.WaitForAction(ctx, failoverResp.ActionID, - WithMaxWaitTime(240*time.Second), - WithPollInterval(2*time.Second), - ) - if err != nil { - ef("[FI] Failover action failed: %v", err) + // Wait for action to complete (real FI only) + if !testMode.IsProxyMock() { + status, err = faultInjector.WaitForAction(ctx, failoverResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Failover action failed: %v", err) + } + p("Failover action completed: %s %s", status.Status, actionOutputIfFailed(status)) } - p("[FI] Failover action completed: %v %s", status.Status, actionOutputIfFailed(status)) p("FAILING_OVER / FAILED_OVER notifications test completed successfully") // Test: Trigger migrate action to generate MOVING, MIGRATING, MIGRATED notifications - p("Triggering migrate action to generate push notifications...") - migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "migrate", - Parameters: map[string]interface{}{ - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - ef("Failed to trigger migrate action: %v", err) + var migrateResp *ActionResponse + if testMode.IsProxyMock() { + // Proxy mock: Directly inject MIGRATING notification + p("Injecting MIGRATING notification (proxy mock mode)...") + if err := injector.InjectMIGRATING(ctx, 2000, 5000); err != nil { + ef("Failed to inject MIGRATING: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Trigger migrate action + p("Triggering migrate action to generate push notifications...") + migrateResp, err = faultInjector.TriggerAction(ctx, ActionRequest{ + Type: "migrate", + Parameters: map[string]interface{}{ + "bdb_id": endpointConfig.BdbID, + }, + }) + if err != nil { + ef("Failed to trigger migrate action: %v", err) + } } go func() { match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { @@ -198,14 +243,16 @@ func TestPushNotifications(t *testing.T) { }() commandsRunner.FireCommandsUntilStop(ctx) if !found { - status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(240*time.Second), - WithPollInterval(2*time.Second), - ) - if err != nil { - ef("[FI] Migrate action failed: %v", err) + if !testMode.IsProxyMock() { + status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Migrate action failed: %v", err) + } + p("[FI] Migrate action completed: %s %s", status.Status, actionOutputIfFailed(status)) } - p("[FI] Migrate action completed: %s %s", status.Status, actionOutputIfFailed(status)) ef("MIGRATING notification for migrate action was not received within 60 seconds") } migrateData := logs2.ExtractDataFromLogMessage(match) @@ -213,14 +260,25 @@ func TestPushNotifications(t *testing.T) { connIDToObserve = uint64(migrateData["connID"].(float64)) p("MIGRATING notification received: seqID: %d, connID: %d", seqIDToObserve, connIDToObserve) - status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(240*time.Second), - WithPollInterval(2*time.Second), - ) - if err != nil { - ef("[FI] Migrate action failed: %v", err) + // Wait for action to complete and inject MIGRATED (mode-aware) + if testMode.IsProxyMock() { + // Proxy mock: Directly inject MIGRATED notification + p("Injecting MIGRATED notification (proxy mock mode)...") + if err := injector.InjectMIGRATED(ctx, 2001, 5000); err != nil { + ef("Failed to inject MIGRATED: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Wait for action to complete (generates MIGRATED automatically) + status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Migrate action failed: %v", err) + } + p("[FI] Migrate action completed: %s %s", status.Status, actionOutputIfFailed(status)) } - p("[FI] Migrate action completed: %s %s", status.Status, actionOutputIfFailed(status)) go func() { p("Waiting for MIGRATED notification on conn %d with seqID %d...", connIDToObserve, seqIDToObserve+1) @@ -238,169 +296,203 @@ func TestPushNotifications(t *testing.T) { p("MIGRATING / MIGRATED notifications test completed successfully") - // Trigger bind action to complete the migration process - p("Triggering bind action to complete migration...") - - bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "bind", - Parameters: map[string]interface{}{ - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - ef("Failed to trigger bind action: %v", err) + // Trigger bind action to complete the migration process (or inject MOVING in proxy mock mode) + var bindResp *ActionResponse + if testMode.IsProxyMock() { + // Proxy mock: Directly inject MOVING notification + // Format: ["MOVING", seqID, timeS, endpoint] + // timeS is the time in seconds until the connection should be handed off + p("Injecting MOVING notification (proxy mock mode)...") + if err := injector.InjectMOVING(ctx, 3000, 30, ""); err != nil { + ef("Failed to inject MOVING: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Trigger bind action + p("Triggering bind action to complete migration...") + bindResp, err = faultInjector.TriggerAction(ctx, ActionRequest{ + Type: "bind", + Parameters: map[string]interface{}{ + "bdb_id": endpointConfig.BdbID, + }, + }) + if err != nil { + ef("Failed to trigger bind action: %v", err) + } } - // start a second client but don't execute any commands on it - p("Starting a second client to observe notification during moving...") - client2, err := factory.Create("push-notification-client-2", &CreateClientOptions{ - Protocol: 3, // RESP3 required for push notifications - PoolSize: poolSize, - MinIdleConns: minIdleConns, - MaxActiveConns: maxConnections, - MaintNotificationsConfig: &maintnotifications.Config{ - Mode: maintnotifications.ModeEnabled, - HandoffTimeout: 40 * time.Second, // 30 seconds - RelaxedTimeout: 30 * time.Minute, // 30 minutes relaxed timeout for second client - PostHandoffRelaxedDuration: 2 * time.Second, // 2 seconds post-handoff relaxed duration - MaxWorkers: 20, - EndpointType: maintnotifications.EndpointTypeExternalIP, // Use external IP for enterprise - }, - ClientName: "push-notification-test-client-2", - }) - - if err != nil { - ef("failed to create client: %v", err) - } - // setup tracking for second client - tracker2 := NewTrackingNotificationsHook() - logger2 := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) - setupNotificationHooks(client2, tracker2, logger2) - commandsRunner2, _ := NewCommandRunner(client2) - p("Second client created") + // Multi-client tests - only run if not using proxy mock + if !testMode.SkipMultiClientTests { + // start a second client but don't execute any commands on it + p("Starting a second client to observe notification during moving...") + client2, err := factory.Create("push-notification-client-2", &CreateClientOptions{ + Protocol: 3, // RESP3 required for push notifications + PoolSize: poolSize, + MinIdleConns: minIdleConns, + MaxActiveConns: maxConnections, + MaintNotificationsConfig: &maintnotifications.Config{ + Mode: maintnotifications.ModeEnabled, + HandoffTimeout: 40 * time.Second, // 30 seconds + RelaxedTimeout: 30 * time.Minute, // 30 minutes relaxed timeout for second client + PostHandoffRelaxedDuration: 2 * time.Second, // 2 seconds post-handoff relaxed duration + MaxWorkers: 20, + EndpointType: maintnotifications.EndpointTypeExternalIP, // Use external IP for enterprise + }, + ClientName: "push-notification-test-client-2", + }) - // Use a channel to communicate errors from the goroutine - errChan := make(chan error, 1) + if err != nil { + ef("failed to create client: %v", err) + } + // setup tracking for second client + tracker2 := NewTrackingNotificationsHook() + logger2 := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) + setupNotificationHooks(client2, tracker2, logger2) + commandsRunner2, _ = NewCommandRunner(client2) + p("Second client created") + + // Use a channel to communicate errors from the goroutine + errChan := make(chan error, 1) + + go func() { + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("goroutine panic: %v", r) + } + }() + + p("Waiting for MOVING notification on first client") + match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { + return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING") + }, 3*time.Minute) + commandsRunner.Stop() + if !found { + errChan <- fmt.Errorf("MOVING notification was not received within 3 minutes ON A FIRST CLIENT") + return + } - go func() { - defer func() { - if r := recover(); r != nil { - errChan <- fmt.Errorf("goroutine panic: %v", r) + // once moving is received, start a second client commands runner + p("Starting commands on second client") + go commandsRunner2.FireCommandsUntilStop(ctx) + + p("Waiting for MOVING notification on second client") + matchNotif, fnd := tracker2.FindOrWaitForNotification("MOVING", 3*time.Minute) + if !fnd { + errChan <- fmt.Errorf("MOVING notification was not received within 3 minutes ON A SECOND CLIENT") + return + } else { + p("MOVING notification received on second client %v", matchNotif) } - }() - p("Waiting for MOVING notification on first client") - match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { - return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING") - }, 3*time.Minute) - commandsRunner.Stop() + // Signal success + errChan <- nil + }() + commandsRunner.FireCommandsUntilStop(ctx) + // wait for moving on first client + // once the commandRunner stops, it means a waiting + // on the logCollector match has completed and we can proceed if !found { - errChan <- fmt.Errorf("MOVING notification was not received within 3 minutes ON A FIRST CLIENT") - return + ef("MOVING notification was not received within 3 minutes") } + movingData := logs2.ExtractDataFromLogMessage(match) + p("MOVING notification received. %v", movingData) + seqIDToObserve = int64(movingData["seqID"].(float64)) + connIDToObserve = uint64(movingData["connID"].(float64)) + + time.Sleep(3 * time.Second) + // start a third client but don't execute any commands on it + p("Starting a third client to observe notification during moving...") + client3, err := factory.Create("push-notification-test-client-3", &CreateClientOptions{ + Protocol: 3, // RESP3 required for push notifications + PoolSize: poolSize, + MinIdleConns: minIdleConns, + MaxActiveConns: maxConnections, + MaintNotificationsConfig: &maintnotifications.Config{ + Mode: maintnotifications.ModeEnabled, + HandoffTimeout: 40 * time.Second, // 30 seconds + RelaxedTimeout: 30 * time.Minute, // 30 minutes relaxed timeout for second client + PostHandoffRelaxedDuration: 2 * time.Second, // 2 seconds post-handoff relaxed duration + MaxWorkers: 20, + EndpointType: maintnotifications.EndpointTypeExternalIP, // Use external IP for enterprise + }, + ClientName: "push-notification-test-client-3", + }) - // once moving is received, start a second client commands runner - p("Starting commands on second client") - go commandsRunner2.FireCommandsUntilStop(ctx) - - p("Waiting for MOVING notification on second client") - matchNotif, fnd := tracker2.FindOrWaitForNotification("MOVING", 3*time.Minute) - if !fnd { - errChan <- fmt.Errorf("MOVING notification was not received within 3 minutes ON A SECOND CLIENT") - return + if err != nil { + ef("failed to create client: %v", err) + } + // setup tracking for third client + tracker3 := NewTrackingNotificationsHook() + logger3 := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) + setupNotificationHooks(client3, tracker3, logger3) + commandsRunner3, _ = NewCommandRunner(client3) + p("Third client created") + go commandsRunner3.FireCommandsUntilStop(ctx) + // wait for moving on third client + movingNotification, found = tracker3.FindOrWaitForNotification("MOVING", 3*time.Minute) + if !found { + p("[NOTICE] MOVING notification was not received within 3 minutes ON A THIRD CLIENT") } else { - p("MOVING notification received on second client %v", matchNotif) + p("MOVING notification received on third client. %v", movingNotification) + if len(movingNotification) != 4 { + p("[NOTICE] Invalid MOVING notification format: %s", movingNotification) + } + mNotifTimeS, ok := movingNotification[2].(int64) + if !ok { + p("[NOTICE] Invalid timeS in MOVING notification: %s", movingNotification) + } + // expect timeS to be less than 15 + if mNotifTimeS < 15 { + p("[NOTICE] Expected timeS < 15, got %d", mNotifTimeS) + } + } + commandsRunner3.Stop() + // Wait for the goroutine to complete and check for errors + if err := <-errChan; err != nil { + ef("Second client goroutine error: %v", err) } - // Signal success - errChan <- nil - }() - commandsRunner.FireCommandsUntilStop(ctx) - // wait for moving on first client - // once the commandRunner stops, it means a waiting - // on the logCollector match has completed and we can proceed - if !found { - ef("MOVING notification was not received within 3 minutes") - } - movingData := logs2.ExtractDataFromLogMessage(match) - p("MOVING notification received. %v", movingData) - seqIDToObserve = int64(movingData["seqID"].(float64)) - connIDToObserve = uint64(movingData["connID"].(float64)) + // Wait for bind action to complete + bindStatus, err = faultInjector.WaitForAction(ctx, bindResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second)) + if err != nil { + ef("Bind action failed: %v", err) + } - time.Sleep(3 * time.Second) - // start a third client but don't execute any commands on it - p("Starting a third client to observe notification during moving...") - client3, err := factory.Create("push-notification-client-2", &CreateClientOptions{ - Protocol: 3, // RESP3 required for push notifications - PoolSize: poolSize, - MinIdleConns: minIdleConns, - MaxActiveConns: maxConnections, - MaintNotificationsConfig: &maintnotifications.Config{ - Mode: maintnotifications.ModeEnabled, - HandoffTimeout: 40 * time.Second, // 30 seconds - RelaxedTimeout: 30 * time.Minute, // 30 minutes relaxed timeout for second client - PostHandoffRelaxedDuration: 2 * time.Second, // 2 seconds post-handoff relaxed duration - MaxWorkers: 20, - EndpointType: maintnotifications.EndpointTypeExternalIP, // Use external IP for enterprise - }, - ClientName: "push-notification-test-client-3", - }) + p("Bind action completed: %s %s", bindStatus.Status, actionOutputIfFailed(bindStatus)) - if err != nil { - ef("failed to create client: %v", err) - } - // setup tracking for second client - tracker3 := NewTrackingNotificationsHook() - logger3 := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) - setupNotificationHooks(client3, tracker3, logger3) - commandsRunner3, _ := NewCommandRunner(client3) - p("Third client created") - go commandsRunner3.FireCommandsUntilStop(ctx) - // wait for moving on third client - movingNotification, found := tracker3.FindOrWaitForNotification("MOVING", 3*time.Minute) - if !found { - p("[NOTICE] MOVING notification was not received within 3 minutes ON A THIRD CLIENT") + p("MOVING notification test completed successfully") } else { - p("MOVING notification received on third client. %v", movingNotification) - if len(movingNotification) != 4 { - p("[NOTICE] Invalid MOVING notification format: %s", movingNotification) - } - mNotifTimeS, ok := movingNotification[2].(int64) - if !ok { - p("[NOTICE] Invalid timeS in MOVING notification: %s", movingNotification) - } - // expect timeS to be less than 15 - if mNotifTimeS < 15 { - p("[NOTICE] Expected timeS < 15, got %d", mNotifTimeS) - } - } - commandsRunner3.Stop() - // Wait for the goroutine to complete and check for errors - if err := <-errChan; err != nil { - ef("Second client goroutine error: %v", err) + p("Skipping multi-client MOVING test (proxy mock mode)") } - // Wait for bind action to complete - bindStatus, err := faultInjector.WaitForAction(ctx, bindResp.ActionID, - WithMaxWaitTime(240*time.Second), - WithPollInterval(2*time.Second)) - if err != nil { - ef("Bind action failed: %v", err) - } - - p("Bind action completed: %s %s", bindStatus.Status, actionOutputIfFailed(bindStatus)) + p("Executing commands and collecting logs for analysis... ") - p("MOVING notification test completed successfully") + // Run commands based on mode + if testMode.SkipMultiClientTests { + // Single client mode (proxy mock) + // Need to wait long enough for: + // 1. MOVING notification to be processed + // 2. Handoff to be scheduled (at timeS/2 = 15 seconds) + // 3. Handoff to execute + // 4. Post-handoff relaxed timeout to be observed (2 seconds) + // Total: ~20 seconds minimum + go commandsRunner.FireCommandsUntilStop(ctx) + time.Sleep(25 * time.Second) + commandsRunner.Stop() + } else { + // Multi-client mode (real FI) + go commandsRunner.FireCommandsUntilStop(ctx) + go commandsRunner2.FireCommandsUntilStop(ctx) + go commandsRunner3.FireCommandsUntilStop(ctx) + time.Sleep(2 * time.Minute) + commandsRunner.Stop() + commandsRunner2.Stop() + commandsRunner3.Stop() + } - p("Executing commands and collecting logs for analysis... ") - go commandsRunner.FireCommandsUntilStop(ctx) - go commandsRunner2.FireCommandsUntilStop(ctx) - go commandsRunner3.FireCommandsUntilStop(ctx) - time.Sleep(2 * time.Minute) - commandsRunner.Stop() - commandsRunner2.Stop() - commandsRunner3.Stop() - time.Sleep(1 * time.Minute) + time.Sleep(2 * testMode.NotificationDelay) allLogsAnalysis := logCollector.GetAnalysis() trackerAnalysis := tracker.GetAnalysis() @@ -426,9 +518,15 @@ func TestPushNotifications(t *testing.T) { e("Expected relaxed timeouts after post-handoff, got none") } // validate number of connections we do not exceed max connections - // we started three clients, so we expect 3x the connections - if allLogsAnalysis.ConnectionCount > int64(maxConnections)*3 { - e("Expected no more than %d connections, got %d", maxConnections*3, allLogsAnalysis.ConnectionCount) + // Adjust expected connections based on mode + expectedMaxConns := int64(maxConnections) + if !testMode.SkipMultiClientTests { + // we started three clients, so we expect 3x the connections + expectedMaxConns = int64(maxConnections) * 3 + } + + if allLogsAnalysis.ConnectionCount > expectedMaxConns { + e("Expected no more than %d connections, got %d", expectedMaxConns, allLogsAnalysis.ConnectionCount) } if allLogsAnalysis.ConnectionCount < int64(minIdleConns) { diff --git a/maintnotifications/e2e/scenario_timeout_configs_test.go b/maintnotifications/e2e/scenario_timeout_configs_test.go index 87d1516ed..0e516d725 100644 --- a/maintnotifications/e2e/scenario_timeout_configs_test.go +++ b/maintnotifications/e2e/scenario_timeout_configs_test.go @@ -14,6 +14,7 @@ import ( ) // TestTimeoutConfigurationsPushNotifications tests push notifications with different timeout configurations +// This test now works with BOTH the real fault injector and the proxy mock func TestTimeoutConfigurationsPushNotifications(t *testing.T) { if os.Getenv("E2E_SCENARIO_TESTS") != "true" { t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") @@ -82,18 +83,23 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { defer cleanup() t.Logf("[TIMEOUT-CONFIGS-%s] Created test database with bdb_id: %d (mode: %s)", timeoutTest.name, bdbID, testMode.Mode) - // Skip this test if using proxy mock (requires real fault injector) - if testMode.IsProxyMock() { - t.Skip("Skipping timeout config test - requires real fault injector") - } - // Get endpoint config from factory (now connected to new database) endpointConfig := factory.GetConfig() - // Create fault injector - faultInjector, err := CreateTestFaultInjector() + // Create notification injector (works with both proxy mock and real FI) + injector, err := NewNotificationInjector() if err != nil { - t.Fatalf("[ERROR] Failed to create fault injector: %v", err) + t.Fatalf("[ERROR] Failed to create notification injector: %v", err) + } + defer injector.Stop() + + // For real fault injector, we also need the FaultInjectorClient for actions + var faultInjector *FaultInjectorClient + if !testMode.IsProxyMock() { + faultInjector, err = CreateTestFaultInjector() + if err != nil { + t.Fatalf("[ERROR] Failed to create fault injector: %v", err) + } } defer func() { @@ -174,14 +180,25 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { // Test failover with this timeout configuration p("Testing failover with %s timeout configuration...", timeoutTest.name) - failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "failover", - Parameters: map[string]interface{}{ - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - ef("Failed to trigger failover action for %s: %v", timeoutTest.name, err) + var failoverResp *ActionResponse + if testMode.IsProxyMock() { + // Proxy mock: Directly inject FAILING_OVER notification + p("Injecting FAILING_OVER notification (proxy mock mode)...") + if err := injector.InjectFAILING_OVER(ctx, 1000); err != nil { + ef("Failed to inject FAILING_OVER: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Trigger failover action + failoverResp, err = faultInjector.TriggerAction(ctx, ActionRequest{ + Type: "failover", + Parameters: map[string]interface{}{ + "bdb_id": endpointConfig.BdbID, + }, + }) + if err != nil { + ef("Failed to trigger failover action for %s: %v", timeoutTest.name, err) + } } // Wait for FAILING_OVER notification @@ -194,6 +211,15 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { failingOverData := logs2.ExtractDataFromLogMessage(match) p("FAILING_OVER notification received for %s. %v", timeoutTest.name, failingOverData) + // Inject FAILED_OVER in proxy mock mode + if testMode.IsProxyMock() { + p("Injecting FAILED_OVER notification (proxy mock mode)...") + if err := injector.InjectFAILED_OVER(ctx, 1001); err != nil { + ef("Failed to inject FAILED_OVER: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } + // Wait for FAILED_OVER notification seqIDToObserve := int64(failingOverData["seqID"].(float64)) connIDToObserve := uint64(failingOverData["connID"].(float64)) @@ -206,42 +232,62 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { failedOverData := logs2.ExtractDataFromLogMessage(match) p("FAILED_OVER notification received for %s. %v", timeoutTest.name, failedOverData) - // Wait for failover to complete - status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID, - WithMaxWaitTime(180*time.Second), - WithPollInterval(2*time.Second), - ) - if err != nil { - ef("[FI] Failover action failed for %s: %v", timeoutTest.name, err) + // Wait for failover to complete (real FI only) + if !testMode.IsProxyMock() { + status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID, + WithMaxWaitTime(180*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Failover action failed for %s: %v", timeoutTest.name, err) + } + p("[FI] Failover action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) } - p("[FI] Failover action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) // Continue traffic to observe timeout behavior - p("Continuing traffic for %v to observe timeout behavior...", timeoutTest.relaxedTimeout*2) - time.Sleep(timeoutTest.relaxedTimeout * 2) + // In proxy mock mode, use shorter sleep since notifications are immediate + // but still need enough time to observe timeout behavior + trafficObservationDuration := timeoutTest.relaxedTimeout * 2 + if testMode.IsProxyMock() { + // Use 11 seconds for proxy mock - enough to observe timeout behavior + trafficObservationDuration = 11 * time.Second + } + p("Continuing traffic for %v to observe timeout behavior...", trafficObservationDuration) + time.Sleep(trafficObservationDuration) // Test migration to trigger more timeout scenarios p("Testing migration with %s timeout configuration...", timeoutTest.name) - migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "migrate", - Parameters: map[string]interface{}{ - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - ef("Failed to trigger migrate action for %s: %v", timeoutTest.name, err) - } + var migrateResp *ActionResponse + if testMode.IsProxyMock() { + // Proxy mock: Directly inject MIGRATING notification + p("Injecting MIGRATING notification (proxy mock mode)...") + if err := injector.InjectMIGRATING(ctx, 2000, 5000); err != nil { + ef("Failed to inject MIGRATING: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Trigger migrate action + migrateResp, err = faultInjector.TriggerAction(ctx, ActionRequest{ + Type: "migrate", + Parameters: map[string]interface{}{ + "bdb_id": endpointConfig.BdbID, + }, + }) + if err != nil { + ef("Failed to trigger migrate action for %s: %v", timeoutTest.name, err) + } - // Wait for migration to complete - status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(240*time.Second), - WithPollInterval(2*time.Second), - ) - if err != nil { - ef("[FI] Migrate action failed for %s: %v", timeoutTest.name, err) - } + // Wait for migration to complete + status, err := faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Migrate action failed for %s: %v", timeoutTest.name, err) + } - p("[FI] Migrate action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) + p("[FI] Migrate action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) + } // Wait for MIGRATING notification match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { @@ -253,24 +299,44 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { migrateData := logs2.ExtractDataFromLogMessage(match) p("MIGRATING notification received for %s: %v", timeoutTest.name, migrateData) - // do a bind action - bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "bind", - Parameters: map[string]interface{}{ - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - ef("Failed to trigger bind action for %s: %v", timeoutTest.name, err) + // Inject MIGRATED in proxy mock mode + if testMode.IsProxyMock() { + p("Injecting MIGRATED notification (proxy mock mode)...") + if err := injector.InjectMIGRATED(ctx, 2001, 5000); err != nil { + ef("Failed to inject MIGRATED: %v", err) + } + time.Sleep(testMode.NotificationDelay) } - status, err = faultInjector.WaitForAction(ctx, bindResp.ActionID, - WithMaxWaitTime(240*time.Second), - WithPollInterval(2*time.Second), - ) - if err != nil { - ef("[FI] Bind action failed for %s: %v", timeoutTest.name, err) + + // do a bind action (or inject MOVING in proxy mock mode) + var bindResp *ActionResponse + if testMode.IsProxyMock() { + // Proxy mock: Directly inject MOVING notification + p("Injecting MOVING notification (proxy mock mode)...") + if err := injector.InjectMOVING(ctx, 3000, 30, ""); err != nil { + ef("Failed to inject MOVING: %v", err) + } + time.Sleep(testMode.NotificationDelay) + } else { + // Real FI: Trigger bind action + bindResp, err = faultInjector.TriggerAction(ctx, ActionRequest{ + Type: "bind", + Parameters: map[string]interface{}{ + "bdb_id": endpointConfig.BdbID, + }, + }) + if err != nil { + ef("Failed to trigger bind action for %s: %v", timeoutTest.name, err) + } + status, err := faultInjector.WaitForAction(ctx, bindResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Bind action failed for %s: %v", timeoutTest.name, err) + } + p("[FI] Bind action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) } - p("[FI] Bind action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) // waiting for moving notification match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { @@ -284,8 +350,15 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { p("MOVING notification received for %s. %v", timeoutTest.name, movingData) // Continue traffic for post-handoff timeout observation - p("Continuing traffic for %v to observe post-handoff timeout behavior...", 1*time.Minute) - time.Sleep(1 * time.Minute) + // In proxy mock mode, use shorter sleep since notifications are immediate + // but still need enough time to observe post-handoff timeout behavior + postHandoffDuration := 1 * time.Minute + if testMode.IsProxyMock() { + // Use 20 seconds for proxy mock - enough to observe post-handoff behavior + postHandoffDuration = 20 * time.Second + } + p("Continuing traffic for %v to observe post-handoff timeout behavior...", postHandoffDuration) + time.Sleep(postHandoffDuration) commandsRunner.Stop() testDuration := time.Since(testStartTime) @@ -300,16 +373,23 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { // Validate timeout-specific behavior switch timeoutTest.name { case "Conservative": - if trackerAnalysis.UnrelaxedTimeoutCount > trackerAnalysis.RelaxedTimeoutCount { + // In proxy mock mode, the timing is more predictable and we may not see + // the same ratio of relaxed to unrelaxed timeouts as with real FI + if !testMode.IsProxyMock() && trackerAnalysis.UnrelaxedTimeoutCount > trackerAnalysis.RelaxedTimeoutCount { e("Conservative config should have more relaxed than unrelaxed timeouts, got relaxed=%d, unrelaxed=%d", trackerAnalysis.RelaxedTimeoutCount, trackerAnalysis.UnrelaxedTimeoutCount) } case "Aggressive": // Aggressive timeouts should complete faster - if testDuration > 5*time.Minute { - e("Aggressive config took too long: %v", testDuration) + // Proxy mock is much faster than real FI, so adjust expectations + maxDuration := 5 * time.Minute + if testMode.IsProxyMock() { + maxDuration = 2 * time.Minute + } + if testDuration > maxDuration { + e("Aggressive config took too long: %v (max: %v)", testDuration, maxDuration) } - if logAnalysis.TotalHandoffRetries > logAnalysis.TotalHandoffCount { + if !testMode.IsProxyMock() && logAnalysis.TotalHandoffRetries > logAnalysis.TotalHandoffCount { e("Expect handoff retries since aggressive timeouts are shorter, got %d retries for %d handoffs", logAnalysis.TotalHandoffRetries, logAnalysis.TotalHandoffCount) } diff --git a/maintnotifications/e2e/scenario_unified_injector_test.go b/maintnotifications/e2e/scenario_unified_injector_test.go index b519049a6..cd9f0ae05 100644 --- a/maintnotifications/e2e/scenario_unified_injector_test.go +++ b/maintnotifications/e2e/scenario_unified_injector_test.go @@ -103,6 +103,9 @@ func TestUnifiedInjector_SMIGRATING(t *testing.T) { } // TestUnifiedInjector_SMIGRATED demonstrates SMIGRATED notification handling +// This test works with BOTH the real fault injector and the proxy mock +// - Proxy mock: Can directly inject SMIGRATED +// - Real FI: Triggers slot migration which generates both SMIGRATING and SMIGRATED func TestUnifiedInjector_SMIGRATED(t *testing.T) { ctx := context.Background() @@ -120,11 +123,6 @@ func TestUnifiedInjector_SMIGRATED(t *testing.T) { testMode := injector.GetTestModeConfig() t.Logf("Using %s mode", testMode.Mode) - // Skip this test if using real fault injector (can't directly inject SMIGRATED) - if testMode.IsRealFaultInjector() { - t.Skip("Skipping SMIGRATED test - real fault injector cannot directly inject SMIGRATED") - } - // Track cluster state reloads var reloadCount atomic.Int32 @@ -190,24 +188,36 @@ func TestUnifiedInjector_SMIGRATED(t *testing.T) { // Wait for blocking command to start (mode-aware) time.Sleep(testMode.ConnectionEstablishDelay) - // Inject SMIGRATED notification with the "new" node (node 3 / 17003) - // This will simulate migrating slots from node 2 (17002) to node 3 (17003) - t.Log("Injecting SMIGRATED notification to swap node 2 for node 3...") - - // Get all node addresses - we want to use node 3 (index 3) as the target + // Get all node addresses - needed for both modes addrs := injector.GetClusterAddrs() var newNodeAddr string if len(addrs) >= 4 { newNodeAddr = addrs[3] // Node 3: 127.0.0.1:17003 - t.Logf("Using new node address: %s", newNodeAddr) } else { // Fallback to first node if we don't have 4 nodes newNodeAddr = addrs[0] - t.Logf("Warning: Less than 4 nodes available, using %s", newNodeAddr) } - if err := injector.InjectSMIGRATED(ctx, 12346, newNodeAddr, "1000-2000", "3000"); err != nil { - t.Fatalf("Failed to inject SMIGRATED: %v", err) + // Mode-specific behavior for triggering SMIGRATED + if testMode.IsProxyMock() { + // Proxy mock: Directly inject SMIGRATED notification + t.Log("Injecting SMIGRATED notification to swap node 2 for node 3...") + t.Logf("Using new node address: %s", newNodeAddr) + + if err := injector.InjectSMIGRATED(ctx, 12346, newNodeAddr, "1000-2000", "3000"); err != nil { + t.Fatalf("Failed to inject SMIGRATED: %v", err) + } + } else { + // Real fault injector: Trigger slot migration which generates both SMIGRATING and SMIGRATED + t.Log("Triggering slot migration (will generate SMIGRATING and SMIGRATED)...") + + // First inject SMIGRATING (this triggers the actual migration) + if err := injector.InjectSMIGRATING(ctx, 12345, "1000-2000", "3000"); err != nil { + t.Fatalf("Failed to trigger slot migration: %v", err) + } + + // Wait for migration to complete (real FI takes longer) + t.Log("Waiting for migration to complete...") } // Wait for notification processing (mode-aware) @@ -217,14 +227,30 @@ func TestUnifiedInjector_SMIGRATED(t *testing.T) { <-blockingDone // Verify notification was received - // Note: SMIGRATED notifications may not always be received in proxy mock mode - // because they're sent to all connections, but the client might not be actively - // listening on all of them. This is expected behavior. analysis := tracker.GetAnalysis() - if analysis.MigratedCount > 0 { - t.Logf("✓ Received %d SMIGRATED notification(s)", analysis.MigratedCount) + + if testMode.IsProxyMock() { + // Proxy mock: SMIGRATED notifications may not always be received + // because they're sent to all connections, but the client might not be actively + // listening on all of them. This is expected behavior. + if analysis.MigratedCount > 0 { + t.Logf("✓ Received %d SMIGRATED notification(s)", analysis.MigratedCount) + } else { + t.Logf("Note: No SMIGRATED notifications received (expected in proxy mock mode)") + } } else { - t.Logf("Note: No SMIGRATED notifications received (expected in proxy mock mode)") + // Real FI: Should receive both SMIGRATING and SMIGRATED + if analysis.MigratingCount == 0 { + t.Errorf("Expected to receive SMIGRATING notification with real FI, got 0") + } else { + t.Logf("✓ Received %d SMIGRATING notification(s)", analysis.MigratingCount) + } + + if analysis.MigratedCount > 0 { + t.Logf("✓ Received %d SMIGRATED notification(s)", analysis.MigratedCount) + } else { + t.Logf("Note: SMIGRATED notification not yet received (migration may still be in progress)") + } } // Verify cluster state reload callback diff --git a/maintnotifications/e2e/test_mode.go b/maintnotifications/e2e/test_mode.go index 7497bd700..0aad61bd2 100644 --- a/maintnotifications/e2e/test_mode.go +++ b/maintnotifications/e2e/test_mode.go @@ -60,7 +60,7 @@ func GetTestModeConfig() *TestModeConfig { ActionPollInterval: 500 * time.Millisecond, DatabaseReadyDelay: 1 * time.Second, ConnectionEstablishDelay: 500 * time.Millisecond, - MaxClients: 1, // Proxy mock only supports single client + MaxClients: 1, SkipMultiClientTests: true, } @@ -72,7 +72,7 @@ func GetTestModeConfig() *TestModeConfig { ActionPollInterval: 2 * time.Second, DatabaseReadyDelay: 10 * time.Second, ConnectionEstablishDelay: 2 * time.Second, - MaxClients: 3, // Real FI can handle multiple clients + MaxClients: 3, SkipMultiClientTests: false, }