From 5f5a479d7662f6fe5587e2a36f84f36230e778d4 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Fri, 23 Jan 2026 10:17:51 -0600 Subject: [PATCH] pkg/capabilities/registry: fix chan race --- pkg/capabilities/registry/base.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/capabilities/registry/base.go b/pkg/capabilities/registry/base.go index 08e804866..6a3241494 100644 --- a/pkg/capabilities/registry/base.go +++ b/pkg/capabilities/registry/base.go @@ -222,6 +222,7 @@ type triggerRegistration struct { request capabilities.TriggerRegistrationRequest outCh chan capabilities.TriggerResponse cancel context.CancelFunc // used to shut down the forwarding goroutine when the trigger is unregistered + done chan struct{} // closed by forwarding goroutine upon exit } func newTriggerRegistrationManager(lggr logger.Logger) *triggerRegistrationManager { @@ -240,18 +241,16 @@ func (m *triggerRegistrationManager) register(ctx context.Context, underlying ca } func (m *triggerRegistrationManager) unregister(ctx context.Context, underlying capabilities.TriggerExecutable, req capabilities.TriggerRegistrationRequest) error { - var out chan capabilities.TriggerResponse if reg, ok := m.regs[req.TriggerID]; ok { if reg.cancel != nil { reg.cancel() + <-reg.done + } + if reg.outCh != nil { + close(reg.outCh) } - out = reg.outCh delete(m.regs, req.TriggerID) } - - if out != nil { - close(out) - } return underlying.UnregisterTrigger(ctx, req) } @@ -274,13 +273,16 @@ func (m *triggerRegistrationManager) upsertRegistration(req capabilities.Trigger } if regInMap.cancel != nil { regInMap.cancel() // shuts down the previous forwarding goroutine + <-regInMap.done regInMap.cancel = nil + regInMap.done = nil } } if in != nil { ctxForward, cancel := context.WithCancel(context.Background()) regInMap.cancel = cancel - go forwardTriggerResponses(ctxForward, in, regInMap.outCh) + regInMap.done = make(chan struct{}) + go forwardTriggerResponses(ctxForward, in, regInMap.outCh, func() { close(regInMap.done) }) } return regInMap.outCh } @@ -302,7 +304,8 @@ func (m *triggerRegistrationManager) rebind(newUnderlying capabilities.TriggerEx } } -func forwardTriggerResponses(ctx context.Context, in <-chan capabilities.TriggerResponse, out chan<- capabilities.TriggerResponse) { +func forwardTriggerResponses(ctx context.Context, in <-chan capabilities.TriggerResponse, out chan<- capabilities.TriggerResponse, done func()) { + defer done() for { select { case <-ctx.Done():