Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions pkg/capabilities/registry/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type triggerRegistration struct {
request capabilities.TriggerRegistrationRequest
outCh chan capabilities.TriggerResponse
cancel context.CancelFunc // used to shut down the forwarding goroutine when the trigger is unregistered
done chan struct{} // closed by forwarding goroutine upon exit
}

func newTriggerRegistrationManager(lggr logger.Logger) *triggerRegistrationManager {
Expand All @@ -240,18 +241,16 @@ func (m *triggerRegistrationManager) register(ctx context.Context, underlying ca
}

func (m *triggerRegistrationManager) unregister(ctx context.Context, underlying capabilities.TriggerExecutable, req capabilities.TriggerRegistrationRequest) error {
var out chan capabilities.TriggerResponse
if reg, ok := m.regs[req.TriggerID]; ok {
if reg.cancel != nil {
reg.cancel()
<-reg.done
}
if reg.outCh != nil {
close(reg.outCh)
}
out = reg.outCh
delete(m.regs, req.TriggerID)
}

if out != nil {
close(out)
}
return underlying.UnregisterTrigger(ctx, req)
}

Expand All @@ -274,13 +273,16 @@ func (m *triggerRegistrationManager) upsertRegistration(req capabilities.Trigger
}
if regInMap.cancel != nil {
regInMap.cancel() // shuts down the previous forwarding goroutine
<-regInMap.done
regInMap.cancel = nil
regInMap.done = nil
}
}
if in != nil {
ctxForward, cancel := context.WithCancel(context.Background())
regInMap.cancel = cancel
go forwardTriggerResponses(ctxForward, in, regInMap.outCh)
regInMap.done = make(chan struct{})
go forwardTriggerResponses(ctxForward, in, regInMap.outCh, func() { close(regInMap.done) })
}
return regInMap.outCh
}
Expand All @@ -302,7 +304,8 @@ func (m *triggerRegistrationManager) rebind(newUnderlying capabilities.TriggerEx
}
}

func forwardTriggerResponses(ctx context.Context, in <-chan capabilities.TriggerResponse, out chan<- capabilities.TriggerResponse) {
func forwardTriggerResponses(ctx context.Context, in <-chan capabilities.TriggerResponse, out chan<- capabilities.TriggerResponse, done func()) {
defer done()
for {
select {
case <-ctx.Done():
Expand Down
Loading