Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions event/multisub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ package event
// JoinSubscriptions joins multiple subscriptions to be able to track them as
// one entity and collectively cancel them or consume any errors from them.
func JoinSubscriptions(subs ...Subscription) Subscription {
// Handle empty subscription list to avoid blocking forever
if len(subs) == 0 {
return NewSubscription(func(unsubbed <-chan struct{}) error {
<-unsubbed
return nil
})
}
return NewSubscription(func(unsubbed <-chan struct{}) error {
// Unsubscribe all subscriptions before returning
defer func() {
Expand Down
27 changes: 27 additions & 0 deletions event/multisub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,30 @@ func TestMultisubFullUnsubscribe(t *testing.T) {
default:
}
}

func TestMultisubEmpty(t *testing.T) {
// Test that joining zero subscriptions doesn't block forever
sub := JoinSubscriptions()
if sub == nil {
t.Fatal("JoinSubscriptions() returned nil")
}
// Should be able to unsubscribe immediately without blocking
done := make(chan struct{})
go func() {
sub.Unsubscribe()
close(done)
}()
select {
case <-done:
// Success - unsubscribe completed
case <-time.After(100 * time.Millisecond):
t.Error("Unsubscribe blocked on empty subscription list")
}
// Error channel should be closed
select {
case <-sub.Err():
// Expected - channel is closed
default:
t.Error("error channel not closed after unsubscribe")
}
}
Loading