@@ -308,7 +308,7 @@ func (m *Manager) findCanonicalAssertionBranch(
308308 cursor = assertion .AssertionHash
309309 m .assertionChainData .latestAgreedAssertion = cursor
310310 m .assertionChainData .canonicalAssertions [cursor ] = assertion
311- m .observedCanonicalAssertions <- cursor
311+ m .sendToConfirmationQueue ( cursor , "findCanonicalAssertionBranch" )
312312 }
313313 }
314314 }
@@ -365,7 +365,7 @@ func (m *Manager) respondToAnyInvalidAssertions(
365365 m .assertionChainData .canonicalAssertions [postedAssertionHash ] = postedRival
366366 m .submittedAssertions .Insert (postedAssertionHash )
367367 m .submittedRivalsCount ++
368- m .observedCanonicalAssertions <- postedAssertionHash
368+ m .sendToConfirmationQueue ( postedAssertionHash , "respondToAnyInvalidAssertions" )
369369 }
370370 }
371371 }
@@ -559,3 +559,26 @@ func (m *Manager) saveAssertionToDB(ctx context.Context, creationInfo *protocol.
559559 Status : status .String (),
560560 })
561561}
562+
563+ // Send assertion to confirmation queue
564+ func (m * Manager ) sendToConfirmationQueue (assertionHash protocol.AssertionHash , addedBy string ) {
565+ m .confirmQueueMutex .Lock ()
566+ defer m .confirmQueueMutex .Unlock ()
567+
568+ // Check if assertion is already in confirmation queue
569+ if m .confirming .Has (assertionHash ) {
570+ log .Debug ("Assertion already in confirmation queue" , "assertionHash" , assertionHash , "addedBy" , addedBy )
571+ return // Already in confirmation queue, skip
572+ }
573+ log .Info ("Sending assertion to confirmation queue" , "assertionHash" , assertionHash , "addedBy" , addedBy )
574+ // Mark as confirming
575+ m .confirming .Insert (assertionHash )
576+
577+ // Send to confirmation queue
578+ select {
579+ case m .observedCanonicalAssertions <- assertionHash :
580+ default :
581+ m .confirming .Delete (assertionHash )
582+ log .Warn ("Failed to send assertion to confirmation queue: channel full" , "assertionHash" , assertionHash , "addedBy" , addedBy )
583+ }
584+ }
0 commit comments