@@ -26,21 +26,28 @@ var (
2626
2727// AdvancerRepository defines the repository interface needed by the Advancer service
2828type AdvancerRepository interface {
29+ ListEpochs (ctx context.Context , nameOrAddress string , f repository.EpochFilter , p repository.Pagination , descending bool ) ([]* Epoch , uint64 , error )
2930 ListInputs (ctx context.Context , nameOrAddress string , f repository.InputFilter , p repository.Pagination , descending bool ) ([]* Input , uint64 , error )
3031 GetLastInput (ctx context.Context , appAddress string , epochIndex uint64 ) (* Input , error )
3132 StoreAdvanceResult (ctx context.Context , appID int64 , ar * AdvanceResult ) error
32- UpdateEpochsInputsProcessed (ctx context.Context , nameOrAddress string ) ([]uint64 , error )
33- UpdateEpochCommitment (ctx context.Context , appID int64 , epochIndex uint64 , commitment []byte ) error
33+ UpdateEpochInputsProcessed (ctx context.Context , nameOrAddress string , epochIndex uint64 ) error
34+ UpdateEpochOutputsProof (ctx context.Context , appID int64 , epochIndex uint64 , proof * OutputsProof ) error
35+ RepeatPreviousEpochOutputsProof (ctx context.Context , appID int64 , epochIndex uint64 ) error
3436 UpdateApplicationState (ctx context.Context , appID int64 , state ApplicationState , reason * string ) error
3537 GetEpoch (ctx context.Context , nameOrAddress string , index uint64 ) (* Epoch , error )
3638 UpdateInputSnapshotURI (ctx context.Context , appId int64 , inputIndex uint64 , snapshotURI string ) error
3739 GetLastSnapshot (ctx context.Context , nameOrAddress string ) (* Input , error )
3840 GetLastProcessedInput (ctx context.Context , appAddress string ) (* Input , error )
3941}
4042
43+ func getUnprocessedEpochs (ctx context.Context , er AdvancerRepository , address string ) ([]* Epoch , uint64 , error ) {
44+ f := repository.EpochFilter {Status : []EpochStatus {EpochStatus_Open , EpochStatus_Closed }}
45+ return er .ListEpochs (ctx , address , f , repository.Pagination {}, false )
46+ }
47+
4148// getUnprocessedInputs retrieves inputs that haven't been processed yet
42- func getUnprocessedInputs (ctx context.Context , repo AdvancerRepository , appAddress string ) ([]* Input , uint64 , error ) {
43- f := repository.InputFilter {Status : Pointer (InputCompletionStatus_None )}
49+ func getUnprocessedInputs (ctx context.Context , repo AdvancerRepository , appAddress string , epochIndex uint64 ) ([]* Input , uint64 , error ) {
50+ f := repository.InputFilter {Status : Pointer (InputCompletionStatus_None ), EpochIndex : & epochIndex }
4451 return repo .ListInputs (ctx , appAddress , f , repository.Pagination {}, false )
4552}
4653
@@ -65,38 +72,66 @@ func (s *Service) Step(ctx context.Context) error {
6572 for _ , app := range apps {
6673 appAddress := app .IApplicationAddress .String ()
6774
68- err := s . handleEpochSnapshotAfterInputProcessed (ctx , app )
75+ epochs , _ , err := getUnprocessedEpochs (ctx , s . repository , appAddress )
6976 if err != nil {
7077 return err
7178 }
7279
73- // Get unprocessed inputs for this application
74- s .Logger .Debug ("Querying for unprocessed inputs" , "application" , app .Name )
75- inputs , _ , err := getUnprocessedInputs (ctx , s .repository , appAddress )
76- if err != nil {
77- return err
78- }
80+ for _ , epoch := range epochs {
81+ // Get unprocessed inputs for this application
82+ s .Logger .Debug ("Querying for unprocessed inputs" , "application" , app .Name , "epoch_index" , epoch .Index )
83+ inputs , _ , err := getUnprocessedInputs (ctx , s .repository , appAddress , epoch .Index )
84+ if err != nil {
85+ return err
86+ }
7987
80- // Process the inputs
81- s .Logger .Debug ("Processing inputs" , "application" , app .Name , "count" , len (inputs ))
82- err = s .processInputs (ctx , app , inputs )
83- if err != nil {
84- return err
85- }
88+ // Process the inputs
89+ s .Logger .Debug ("Processing inputs" , "application" , app .Name , "epoch_index" , epoch . Index , "count" , len (inputs ))
90+ err = s .processInputs (ctx , app , inputs )
91+ if err != nil {
92+ return err
93+ }
8694
87- // Update epochs to mark inputs as processed
88- updatedEpochIndexes , err := s .repository .UpdateEpochsInputsProcessed (ctx , appAddress )
89- if err != nil {
90- return err
91- }
92- for _ , epochIndex := range updatedEpochIndexes {
93- s .Logger .Info ("Epoch updated to Inputs Processed" , "application" , app .Name , "epoch_index" , epochIndex )
95+ if epoch .Status == EpochStatus_Closed {
96+ if allProcessed , perr := s .isAllEpochInputsProcessed (app , epoch ); perr == nil && allProcessed {
97+ err := s .handleEpochAfterInputsProcessed (ctx , app , epoch )
98+ if err != nil {
99+ return err
100+ }
101+
102+ // Update epochs to mark inputs as processed
103+ err = s .repository .UpdateEpochInputsProcessed (ctx , appAddress , epoch .Index )
104+ if err != nil {
105+ return err
106+ }
107+ s .Logger .Info ("Epoch updated to Inputs Processed" , "application" , app .Name , "epoch_index" , epoch .Index )
108+ } else if perr != nil {
109+ return perr
110+ } else {
111+ break // some inputs were not processed yet, check next time
112+ }
113+ }
94114 }
95115 }
96116
97117 return nil
98118}
99119
120+ func (s * Service ) isAllEpochInputsProcessed (app * Application , epoch * Epoch ) (bool , error ) {
121+ // epoch has no inputs
122+ if epoch .InputIndexLowerBound == epoch .InputIndexUpperBound {
123+ return true , nil
124+ }
125+ machine , exists := s .machineManager .GetMachine (app .ID )
126+ if ! exists {
127+ return false , fmt .Errorf ("%w: %d" , ErrNoApp , app .ID )
128+ }
129+ if machine .ProcessedInputs () == epoch .InputIndexUpperBound {
130+ return true , nil
131+ }
132+ return false , nil
133+ }
134+
100135// processInputs handles the processing of inputs for an application
101136func (s * Service ) processInputs (ctx context.Context , app * Application , inputs []* Input ) error {
102137 // Skip if there are no inputs to process
@@ -213,24 +248,52 @@ func (s *Service) isEpochLastInput(ctx context.Context, app *Application, input
213248 return false , nil
214249}
215250
216- // handleEpochSnapshotAfterInputProcessed handles the snapshot creation after when an epoch is closed after an input was processed
217- func (s * Service ) handleEpochSnapshotAfterInputProcessed (ctx context.Context , app * Application ) error {
218- // Get the machine instance for this application
219- machine , exists := s .machineManager .GetMachine (app .ID )
220- if ! exists {
221- return fmt .Errorf ("%w: %d" , ErrNoApp , app .ID )
222- }
251+ // handleEpochAfterInputsProcessed handles the snapshot creation after when an epoch is closed after an input was processed
252+ func (s * Service ) handleEpochAfterInputsProcessed (ctx context.Context , app * Application , epoch * Epoch ) error {
253+ // if epoch has inputs, all data is updated after advance, just check for snapshot
254+ if epoch .InputIndexLowerBound != epoch .InputIndexUpperBound {
255+ // Get the machine instance for this application
256+ machine , exists := s .machineManager .GetMachine (app .ID )
257+ if ! exists {
258+ return fmt .Errorf ("%w: %d" , ErrNoApp , app .ID )
259+ }
223260
224- // Check if this is the last processed input
225- lastProcessedInput , err := s .repository .GetLastProcessedInput (ctx , app .IApplicationAddress .String ())
226- if err != nil {
227- return fmt .Errorf ("failed to get last input: %w" , err )
261+ // Check if this is the last processed input
262+ lastProcessedInput , err := s .repository .GetLastProcessedInput (ctx , app .IApplicationAddress .String ())
263+ if err != nil {
264+ return fmt .Errorf ("failed to get last input: %w" , err )
265+ }
266+
267+ // Check if the application has a epoch snapshot policy
268+ if lastProcessedInput != nil && app .ExecutionParameters .SnapshotPolicy == SnapshotPolicy_EveryEpoch {
269+ // Handle the snapshot
270+ return s .handleSnapshot (ctx , app , machine , lastProcessedInput )
271+ }
272+
273+ return nil
228274 }
229275
230- // Check if the application has a epoch snapshot policy
231- if lastProcessedInput != nil && app .ExecutionParameters .SnapshotPolicy == SnapshotPolicy_EveryEpoch {
232- // Handle the snapshot
233- return s .handleSnapshot (ctx , app , machine , lastProcessedInput )
276+ // if epoch has no inputs, we need to copy previous epoch Outputs Proof
277+ // first epoch we need to get it from the template
278+ if epoch .Index == 0 {
279+ // Get the machine instance for this application
280+ machine , exists := s .machineManager .GetMachine (app .ID )
281+ if ! exists {
282+ return fmt .Errorf ("%w: %d" , ErrNoApp , app .ID )
283+ }
284+ outputsProof , err := machine .OutputsProof (ctx , 0 )
285+ if err != nil {
286+ return fmt .Errorf ("failed to get outputs proof from machine: %w" , err )
287+ }
288+ err = s .repository .UpdateEpochOutputsProof (ctx , app .ID , epoch .Index , outputsProof )
289+ if err != nil {
290+ return fmt .Errorf ("failed to store outputs proof for epoch 0: %w" , err )
291+ }
292+ } else {
293+ err := s .repository .RepeatPreviousEpochOutputsProof (ctx , app .ID , epoch .Index )
294+ if err != nil {
295+ return fmt .Errorf ("failed to repeat previous epoch outputs proof: %w" , err )
296+ }
234297 }
235298
236299 return nil
@@ -294,17 +357,8 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
294357 }
295358 }
296359
297- // Remove previous snapshot if it exists
298- previousSnapshot , err := s .repository .GetLastSnapshot (ctx , app .IApplicationAddress .String ())
299- if err != nil {
300- s .Logger .Error ("Failed to get previous snapshot" ,
301- "application" , app .Name ,
302- "error" , err )
303- // Continue even if we can't get the previous snapshot
304- }
305-
306360 // Create the snapshot
307- err = machine .CreateSnapshot (ctx , input .Index + 1 , snapshotPath )
361+ err : = machine .CreateSnapshot (ctx , input .Index + 1 , snapshotPath )
308362 if err != nil {
309363 return err
310364 }
@@ -318,6 +372,15 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
318372 return fmt .Errorf ("failed to update input snapshot URI: %w" , err )
319373 }
320374
375+ // Get previous snapshot if it exists
376+ previousSnapshot , err := s .repository .GetLastSnapshot (ctx , app .IApplicationAddress .String ())
377+ if err != nil {
378+ s .Logger .Error ("Failed to get previous snapshot" ,
379+ "application" , app .Name ,
380+ "error" , err )
381+ // Continue even if we can't get the previous snapshot
382+ }
383+
321384 // Remove previous snapshot if it exists
322385 if previousSnapshot != nil && previousSnapshot .Index != input .Index && previousSnapshot .SnapshotURI != nil {
323386 // Only remove if it's a different snapshot than the one we just created
0 commit comments