diff --git a/client/swagger/models/deal_list_deal_request.go b/client/swagger/models/deal_list_deal_request.go index 0bbad9ab..ef88917f 100644 --- a/client/swagger/models/deal_list_deal_request.go +++ b/client/swagger/models/deal_list_deal_request.go @@ -20,6 +20,9 @@ import ( // swagger:model deal.ListDealRequest type DealListDealRequest struct { + // deal type filter (market for f05, pdp for f41) + DealTypes []ModelDealType `json:"dealTypes"` + // preparation ID or name filter Preparations []string `json:"preparations"` @@ -40,6 +43,10 @@ type DealListDealRequest struct { func (m *DealListDealRequest) Validate(formats strfmt.Registry) error { var res []error + if err := m.validateDealTypes(formats); err != nil { + res = append(res, err) + } + if err := m.validateStates(formats); err != nil { res = append(res, err) } @@ -50,6 +57,31 @@ func (m *DealListDealRequest) Validate(formats strfmt.Registry) error { return nil } +func (m *DealListDealRequest) validateDealTypes(formats strfmt.Registry) error { + if swag.IsZero(m.DealTypes) { // not required + return nil + } + + for i := 0; i < len(m.DealTypes); i++ { + + if err := m.DealTypes[i].Validate(formats); err != nil { + ve := new(errors.Validation) + if stderrors.As(err, &ve) { + return ve.ValidateName("dealTypes" + "." + strconv.Itoa(i)) + } + ce := new(errors.CompositeError) + if stderrors.As(err, &ce) { + return ce.ValidateName("dealTypes" + "." + strconv.Itoa(i)) + } + + return err + } + + } + + return nil +} + func (m *DealListDealRequest) validateStates(formats strfmt.Registry) error { if swag.IsZero(m.States) { // not required return nil @@ -79,6 +111,10 @@ func (m *DealListDealRequest) validateStates(formats strfmt.Registry) error { func (m *DealListDealRequest) ContextValidate(ctx context.Context, formats strfmt.Registry) error { var res []error + if err := m.contextValidateDealTypes(ctx, formats); err != nil { + res = append(res, err) + } + if err := m.contextValidateStates(ctx, formats); err != nil { res = append(res, err) } @@ -89,6 +125,32 @@ func (m *DealListDealRequest) ContextValidate(ctx context.Context, formats strfm return nil } +func (m *DealListDealRequest) contextValidateDealTypes(ctx context.Context, formats strfmt.Registry) error { + + for i := 0; i < len(m.DealTypes); i++ { + + if swag.IsZero(m.DealTypes[i]) { // not required + return nil + } + + if err := m.DealTypes[i].ContextValidate(ctx, formats); err != nil { + ve := new(errors.Validation) + if stderrors.As(err, &ve) { + return ve.ValidateName("dealTypes" + "." + strconv.Itoa(i)) + } + ce := new(errors.CompositeError) + if stderrors.As(err, &ce) { + return ce.ValidateName("dealTypes" + "." + strconv.Itoa(i)) + } + + return err + } + + } + + return nil +} + func (m *DealListDealRequest) contextValidateStates(ctx context.Context, formats strfmt.Registry) error { for i := 0; i < len(m.States); i++ { diff --git a/client/swagger/models/model_deal.go b/client/swagger/models/model_deal.go index 252591e9..74f3bcb2 100644 --- a/client/swagger/models/model_deal.go +++ b/client/swagger/models/model_deal.go @@ -28,6 +28,9 @@ type ModelDeal struct { // deal Id DealID int64 `json:"dealId,omitempty"` + // deal type + DealType ModelDealType `json:"dealType,omitempty"` + // end epoch EndEpoch int64 `json:"endEpoch,omitempty"` @@ -43,6 +46,9 @@ type ModelDeal struct { // LastVerifiedAt is the last time the deal was verified as active by the tracker LastVerifiedAt string `json:"lastVerifiedAt,omitempty"` + // NextChallengeEpoch is the next epoch when a challenge proof is due + NextChallengeEpoch int64 `json:"nextChallengeEpoch,omitempty"` + // piece cid PieceCid string `json:"pieceCid,omitempty"` @@ -52,6 +58,12 @@ type ModelDeal struct { // price Price string `json:"price,omitempty"` + // PDP-specific fields (only populated for DealTypePDP) + ProofSetID int64 `json:"proofSetId,omitempty"` + + // ProofSetLive indicates if the proof set is live (actively being challenged) + ProofSetLive bool `json:"proofSetLive,omitempty"` + // proposal Id ProposalID string `json:"proposalId,omitempty"` @@ -81,6 +93,10 @@ type ModelDeal struct { func (m *ModelDeal) Validate(formats strfmt.Registry) error { var res []error + if err := m.validateDealType(formats); err != nil { + res = append(res, err) + } + if err := m.validateState(formats); err != nil { res = append(res, err) } @@ -91,6 +107,27 @@ func (m *ModelDeal) Validate(formats strfmt.Registry) error { return nil } +func (m *ModelDeal) validateDealType(formats strfmt.Registry) error { + if swag.IsZero(m.DealType) { // not required + return nil + } + + if err := m.DealType.Validate(formats); err != nil { + ve := new(errors.Validation) + if stderrors.As(err, &ve) { + return ve.ValidateName("dealType") + } + ce := new(errors.CompositeError) + if stderrors.As(err, &ce) { + return ce.ValidateName("dealType") + } + + return err + } + + return nil +} + func (m *ModelDeal) validateState(formats strfmt.Registry) error { if swag.IsZero(m.State) { // not required return nil @@ -116,6 +153,10 @@ func (m *ModelDeal) validateState(formats strfmt.Registry) error { func (m *ModelDeal) ContextValidate(ctx context.Context, formats strfmt.Registry) error { var res []error + if err := m.contextValidateDealType(ctx, formats); err != nil { + res = append(res, err) + } + if err := m.contextValidateState(ctx, formats); err != nil { res = append(res, err) } @@ -126,6 +167,28 @@ func (m *ModelDeal) ContextValidate(ctx context.Context, formats strfmt.Registry return nil } +func (m *ModelDeal) contextValidateDealType(ctx context.Context, formats strfmt.Registry) error { + + if swag.IsZero(m.DealType) { // not required + return nil + } + + if err := m.DealType.ContextValidate(ctx, formats); err != nil { + ve := new(errors.Validation) + if stderrors.As(err, &ve) { + return ve.ValidateName("dealType") + } + ce := new(errors.CompositeError) + if stderrors.As(err, &ce) { + return ce.ValidateName("dealType") + } + + return err + } + + return nil +} + func (m *ModelDeal) contextValidateState(ctx context.Context, formats strfmt.Registry) error { if swag.IsZero(m.State) { // not required diff --git a/client/swagger/models/model_deal_type.go b/client/swagger/models/model_deal_type.go new file mode 100644 index 00000000..5d375164 --- /dev/null +++ b/client/swagger/models/model_deal_type.go @@ -0,0 +1,78 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + "encoding/json" + + "github.com/go-openapi/errors" + "github.com/go-openapi/strfmt" + "github.com/go-openapi/validate" +) + +// ModelDealType model deal type +// +// swagger:model model.DealType +type ModelDealType string + +func NewModelDealType(value ModelDealType) *ModelDealType { + return &value +} + +// Pointer returns a pointer to a freshly-allocated ModelDealType. +func (m ModelDealType) Pointer() *ModelDealType { + return &m +} + +const ( + + // ModelDealTypeMarket captures enum value "market" + ModelDealTypeMarket ModelDealType = "market" + + // ModelDealTypePdp captures enum value "pdp" + ModelDealTypePdp ModelDealType = "pdp" +) + +// for schema +var modelDealTypeEnum []any + +func init() { + var res []ModelDealType + if err := json.Unmarshal([]byte(`["market","pdp"]`), &res); err != nil { + panic(err) + } + for _, v := range res { + modelDealTypeEnum = append(modelDealTypeEnum, v) + } +} + +func (m ModelDealType) validateModelDealTypeEnum(path, location string, value ModelDealType) error { + if err := validate.EnumCase(path, location, value, modelDealTypeEnum, true); err != nil { + return err + } + return nil +} + +// Validate validates this model deal type +func (m ModelDealType) Validate(formats strfmt.Registry) error { + var res []error + + // value enum + if err := m.validateModelDealTypeEnum("", "body", m); err != nil { + return err + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +// ContextValidate validates this model deal type based on context it is used +func (m ModelDealType) ContextValidate(ctx context.Context, formats strfmt.Registry) error { + return nil +} diff --git a/cmd/app.go b/cmd/app.go index 40165856..70eae983 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -154,6 +154,7 @@ Upgrading: run.DatasetWorkerCmd, run.ContentProviderCmd, run.DealTrackerCmd, + run.PDPTrackerCmd, run.DealPusherCmd, run.DownloadServerCmd, }, diff --git a/cmd/deal/list.go b/cmd/deal/list.go index 72e33e5b..394717eb 100644 --- a/cmd/deal/list.go +++ b/cmd/deal/list.go @@ -34,6 +34,10 @@ var ListCmd = &cli.Command{ Name: "state", Usage: "Filter deals by state: proposed, published, active, expired, proposal_expired, slashed", }, + &cli.StringSliceFlag{ + Name: "deal-type", + Usage: "Filter deals by type: market (legacy f05), pdp (f41 PDP deals)", + }, }, Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) @@ -47,6 +51,7 @@ var ListCmd = &cli.Command{ Schedules: underscore.Map(c.IntSlice("schedules"), func(i int) uint32 { return uint32(i) }), Providers: c.StringSlice("provider"), States: underscore.Map(c.StringSlice("state"), func(s string) model.DealState { return model.DealState(s) }), + DealTypes: underscore.Map(c.StringSlice("deal-type"), func(s string) model.DealType { return model.DealType(s) }), }) if err != nil { return errors.WithStack(err) diff --git a/cmd/run/pdptracker.go b/cmd/run/pdptracker.go new file mode 100644 index 00000000..31aac8ad --- /dev/null +++ b/cmd/run/pdptracker.go @@ -0,0 +1,60 @@ +package run + +import ( + "time" + + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/service" + "github.com/data-preservation-programs/singularity/service/pdptracker" + "github.com/urfave/cli/v2" +) + +var PDPTrackerCmd = &cli.Command{ + Name: "pdp-tracker", + Usage: "Start a PDP deal tracker that tracks f41 PDP deals for all relevant wallets", + Description: `The PDP tracker monitors Proof of Data Possession (PDP) deals on the Filecoin network. +Unlike legacy f05 market deals, PDP deals use proof sets managed through the PDPVerifier contract +where data is verified through cryptographic challenges. + +This tracker: +- Monitors proof sets for tracked wallets +- Updates deal status based on on-chain proof set state +- Tracks challenge epochs and live status + +Note: Full functionality requires the go-synapse library integration. +See: https://github.com/data-preservation-programs/go-synapse`, + Flags: []cli.Flag{ + &cli.DurationFlag{ + Name: "interval", + Usage: "How often to check for PDP deal updates", + Value: 10 * time.Minute, + }, + &cli.StringFlag{ + Name: "lotus-api", + Usage: "Lotus RPC API endpoint", + EnvVars: []string{"LOTUS_API"}, + }, + }, + Action: func(c *cli.Context) error { + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return err + } + defer closer.Close() + + // Note: PDPClient is nil until go-synapse integration is complete + // When go-synapse is available, instantiate the client here: + // pdpClient := synapse.NewPDPClient(c.String("lotus-api"), ...) + var pdpClient pdptracker.PDPClient = nil + + tracker := pdptracker.NewPDPTracker( + db, + c.Duration("interval"), + c.String("lotus-api"), + pdpClient, + false, + ) + + return service.StartServers(c.Context, pdptracker.Logger, &tracker) + }, +} diff --git a/docs/en/SUMMARY.md b/docs/en/SUMMARY.md index cd214212..e1668d73 100644 --- a/docs/en/SUMMARY.md +++ b/docs/en/SUMMARY.md @@ -57,6 +57,7 @@ * [Dataset Worker](cli-reference/run/dataset-worker.md) * [Content Provider](cli-reference/run/content-provider.md) * [Deal Tracker](cli-reference/run/deal-tracker.md) + * [Pdp Tracker](cli-reference/run/pdp-tracker.md) * [Deal Pusher](cli-reference/run/deal-pusher.md) * [Download Server](cli-reference/run/download-server.md) * [Wallet](cli-reference/wallet/README.md) diff --git a/docs/en/cli-reference/deal/list.md b/docs/en/cli-reference/deal/list.md index d72a23c8..53a6f999 100644 --- a/docs/en/cli-reference/deal/list.md +++ b/docs/en/cli-reference/deal/list.md @@ -14,6 +14,7 @@ OPTIONS: --schedule value [ --schedule value ] Filter deals by schedule --provider value [ --provider value ] Filter deals by provider --state value [ --state value ] Filter deals by state: proposed, published, active, expired, proposal_expired, slashed + --deal-type value [ --deal-type value ] Filter deals by type: market (legacy f05), pdp (f41 PDP deals) --help, -h show help ``` {% endcode %} diff --git a/docs/en/cli-reference/run/README.md b/docs/en/cli-reference/run/README.md index 5ef815a4..64452e5d 100644 --- a/docs/en/cli-reference/run/README.md +++ b/docs/en/cli-reference/run/README.md @@ -13,6 +13,7 @@ COMMANDS: dataset-worker Start a dataset preparation worker to process dataset scanning and preparation tasks content-provider Start a content provider that serves retrieval requests deal-tracker Start a deal tracker that tracks the deal for all relevant wallets + pdp-tracker Start a PDP deal tracker that tracks f41 PDP deals for all relevant wallets deal-pusher Start a deal pusher that monitors deal schedules and pushes deals to storage providers download-server An HTTP server connecting to remote metadata API to offer CAR file downloads help, h Shows a list of commands or help for one command diff --git a/docs/en/cli-reference/run/pdp-tracker.md b/docs/en/cli-reference/run/pdp-tracker.md new file mode 100644 index 00000000..4e7f0b20 --- /dev/null +++ b/docs/en/cli-reference/run/pdp-tracker.md @@ -0,0 +1,29 @@ +# Start a PDP deal tracker that tracks f41 PDP deals for all relevant wallets + +{% code fullWidth="true" %} +``` +NAME: + singularity run pdp-tracker - Start a PDP deal tracker that tracks f41 PDP deals for all relevant wallets + +USAGE: + singularity run pdp-tracker [command options] + +DESCRIPTION: + The PDP tracker monitors Proof of Data Possession (PDP) deals on the Filecoin network. + Unlike legacy f05 market deals, PDP deals use proof sets managed through the PDPVerifier contract + where data is verified through cryptographic challenges. + + This tracker: + - Monitors proof sets for tracked wallets + - Updates deal status based on on-chain proof set state + - Tracks challenge epochs and live status + + Note: Full functionality requires the go-synapse library integration. + See: https://github.com/data-preservation-programs/go-synapse + +OPTIONS: + --interval value How often to check for PDP deal updates (default: 10m0s) + --lotus-api value Lotus RPC API endpoint [$LOTUS_API] + --help, -h show help +``` +{% endcode %} diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 846b98b5..364b2c6f 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -6162,6 +6162,13 @@ const docTemplate = `{ "deal.ListDealRequest": { "type": "object", "properties": { + "dealTypes": { + "description": "deal type filter (market for f05, pdp for f41)", + "type": "array", + "items": { + "$ref": "#/definitions/model.DealType" + } + }, "preparations": { "description": "preparation ID or name filter", "type": "array", @@ -6489,6 +6496,9 @@ const docTemplate = `{ "dealId": { "type": "integer" }, + "dealType": { + "$ref": "#/definitions/model.DealType" + }, "endEpoch": { "type": "integer" }, @@ -6505,6 +6515,10 @@ const docTemplate = `{ "description": "LastVerifiedAt is the last time the deal was verified as active by the tracker", "type": "string" }, + "nextChallengeEpoch": { + "description": "NextChallengeEpoch is the next epoch when a challenge proof is due", + "type": "integer" + }, "pieceCid": { "type": "string" }, @@ -6514,6 +6528,14 @@ const docTemplate = `{ "price": { "type": "string" }, + "proofSetId": { + "description": "PDP-specific fields (only populated for DealTypePDP)", + "type": "integer" + }, + "proofSetLive": { + "description": "ProofSetLive indicates if the proof set is live (actively being challenged)", + "type": "boolean" + }, "proposalId": { "type": "string" }, @@ -6564,6 +6586,17 @@ const docTemplate = `{ "DealErrored" ] }, + "model.DealType": { + "type": "string", + "enum": [ + "market", + "pdp" + ], + "x-enum-varnames": [ + "DealTypeMarket", + "DealTypePDP" + ] + }, "model.File": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 0d0e6e31..2c27e1b6 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -6155,6 +6155,13 @@ "deal.ListDealRequest": { "type": "object", "properties": { + "dealTypes": { + "description": "deal type filter (market for f05, pdp for f41)", + "type": "array", + "items": { + "$ref": "#/definitions/model.DealType" + } + }, "preparations": { "description": "preparation ID or name filter", "type": "array", @@ -6482,6 +6489,9 @@ "dealId": { "type": "integer" }, + "dealType": { + "$ref": "#/definitions/model.DealType" + }, "endEpoch": { "type": "integer" }, @@ -6498,6 +6508,10 @@ "description": "LastVerifiedAt is the last time the deal was verified as active by the tracker", "type": "string" }, + "nextChallengeEpoch": { + "description": "NextChallengeEpoch is the next epoch when a challenge proof is due", + "type": "integer" + }, "pieceCid": { "type": "string" }, @@ -6507,6 +6521,14 @@ "price": { "type": "string" }, + "proofSetId": { + "description": "PDP-specific fields (only populated for DealTypePDP)", + "type": "integer" + }, + "proofSetLive": { + "description": "ProofSetLive indicates if the proof set is live (actively being challenged)", + "type": "boolean" + }, "proposalId": { "type": "string" }, @@ -6557,6 +6579,17 @@ "DealErrored" ] }, + "model.DealType": { + "type": "string", + "enum": [ + "market", + "pdp" + ], + "x-enum-varnames": [ + "DealTypeMarket", + "DealTypePDP" + ] + }, "model.File": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index c99d16d4..d6c215da 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -151,6 +151,11 @@ definitions: type: object deal.ListDealRequest: properties: + dealTypes: + description: deal type filter (market for f05, pdp for f41) + items: + $ref: '#/definitions/model.DealType' + type: array preparations: description: preparation ID or name filter items: @@ -392,6 +397,8 @@ definitions: type: string dealId: type: integer + dealType: + $ref: '#/definitions/model.DealType' endEpoch: type: integer errorMessage: @@ -404,12 +411,23 @@ definitions: description: LastVerifiedAt is the last time the deal was verified as active by the tracker type: string + nextChallengeEpoch: + description: NextChallengeEpoch is the next epoch when a challenge proof is + due + type: integer pieceCid: type: string pieceSize: type: integer price: type: string + proofSetId: + description: PDP-specific fields (only populated for DealTypePDP) + type: integer + proofSetLive: + description: ProofSetLive indicates if the proof set is live (actively being + challenged) + type: boolean proposalId: type: string provider: @@ -448,6 +466,14 @@ definitions: - DealRejected - DealSlashed - DealErrored + model.DealType: + enum: + - market + - pdp + type: string + x-enum-varnames: + - DealTypeMarket + - DealTypePDP model.File: properties: attachmentId: diff --git a/handler/deal/list.go b/handler/deal/list.go index 658b78cc..2977b79b 100644 --- a/handler/deal/list.go +++ b/handler/deal/list.go @@ -15,6 +15,7 @@ type ListDealRequest struct { Schedules []uint32 `json:"schedules"` // schedule id filter Providers []string `json:"providers"` // provider filter States []model.DealState `json:"states"` // state filter + DealTypes []model.DealType `json:"dealTypes"` // deal type filter (market for f05, pdp for f41) } // ListHandler retrieves a list of deals from the database based on the specified filtering criteria in ListDealRequest. @@ -84,6 +85,10 @@ func (DefaultHandler) ListHandler(ctx context.Context, db *gorm.DB, request List statement = statement.Where("state IN ?", request.States) } + if len(request.DealTypes) > 0 { + statement = statement.Where("deal_type IN ?", request.DealTypes) + } + // We did not create indexes for all above query and it should be fine for now err := db.Where(statement).Find(&deals).Error if err != nil { diff --git a/model/basetypes.go b/model/basetypes.go index 2341d85f..34d6647b 100644 --- a/model/basetypes.go +++ b/model/basetypes.go @@ -287,6 +287,7 @@ const ( DealTracker WorkerType = "deal_tracker" DealPusher WorkerType = "deal_pusher" DatasetWorker WorkerType = "dataset_worker" + PDPTracker WorkerType = "pdp_tracker" ) const ( diff --git a/model/migrate.go b/model/migrate.go index db56440a..c856bffc 100644 --- a/model/migrate.go +++ b/model/migrate.go @@ -117,6 +117,11 @@ func AutoMigrate(db *gorm.DB) error { return errors.Wrap(err, "failed to infer piece types") } + // Set deal_type for existing deals that predate the column + if err := inferDealTypes(db); err != nil { + return errors.Wrap(err, "failed to infer deal types") + } + return nil } @@ -306,6 +311,35 @@ func inferPieceTypes(db *gorm.DB) error { return nil } +// inferDealTypes sets deal_type for deals that predate the column. +// All existing deals are assumed to be legacy market deals (f05). +// This is idempotent - only updates rows where deal_type is NULL or empty. +func inferDealTypes(db *gorm.DB) error { + // check if any deals need updating + var count int64 + err := db.Raw(`SELECT COUNT(*) FROM deals WHERE deal_type IS NULL OR deal_type = ''`).Scan(&count).Error + if err != nil { + // table might not exist or column missing + logger.Debugw("skipping deal type inference", "error", err) + return nil + } + + if count == 0 { + return nil + } + + logger.Infow("setting deal type for legacy deals", "count", count) + + // All existing deals are legacy market deals + result := db.Exec(`UPDATE deals SET deal_type = 'market' WHERE deal_type IS NULL OR deal_type = ''`) + if result.Error != nil { + return errors.Wrap(result.Error, "failed to set deal types") + } + + logger.Infow("set deal types", "updated", result.RowsAffected) + return nil +} + // DropAll removes all tables specified in the Tables slice from the database. // // This function is typically used during development or testing where a clean database diff --git a/model/replication.go b/model/replication.go index 4bcada94..08150da6 100644 --- a/model/replication.go +++ b/model/replication.go @@ -10,6 +10,9 @@ type DealState string type ScheduleState string +// DealType represents the type of deal (legacy market vs PDP) +type DealType string + const ( DealProposed DealState = "proposed" DealPublished DealState = "published" @@ -21,6 +24,13 @@ const ( DealErrored DealState = "error" ) +const ( + // DealTypeMarket represents legacy f05 market actor deals + DealTypeMarket DealType = "market" + // DealTypePDP represents f41 PDP (Proof of Data Possession) deals + DealTypePDP DealType = "pdp" +) + var DealStateStrings = []string{ string(DealProposed), string(DealPublished), @@ -43,6 +53,16 @@ var DealStates = []DealState{ DealErrored, } +var DealTypeStrings = []string{ + string(DealTypeMarket), + string(DealTypePDP), +} + +var DealTypes = []DealType{ + DealTypeMarket, + DealTypePDP, +} + const ( ScheduleActive ScheduleState = "active" SchedulePaused ScheduleState = "paused" @@ -81,6 +101,7 @@ type Deal struct { LastVerifiedAt *time.Time `json:"lastVerifiedAt" table:"verbose;format:2006-01-02 15:04:05"` // LastVerifiedAt is the last time the deal was verified as active by the tracker DealID *uint64 `gorm:"unique" json:"dealId"` State DealState `gorm:"index:idx_pending" json:"state"` + DealType DealType `gorm:"index;default:'market'" json:"dealType"` Provider string `json:"provider"` ProposalID string `json:"proposalId" table:"verbose"` Label string `json:"label" table:"verbose"` @@ -93,6 +114,11 @@ type Deal struct { Verified bool `json:"verified"` ErrorMessage string `json:"errorMessage" table:"verbose"` + // PDP-specific fields (only populated for DealTypePDP) + ProofSetID *uint64 `json:"proofSetId,omitempty" table:"verbose"` // ProofSetID is the on-chain proof set ID for PDP deals + ProofSetLive *bool `json:"proofSetLive,omitempty" table:"verbose"` // ProofSetLive indicates if the proof set is live (actively being challenged) + NextChallengeEpoch *int32 `json:"nextChallengeEpoch,omitempty" table:"verbose"` // NextChallengeEpoch is the next epoch when a challenge proof is due + // Associations ScheduleID *ScheduleID `json:"scheduleId" table:"verbose"` Schedule *Schedule `gorm:"foreignKey:ScheduleID;constraint:OnDelete:SET NULL" json:"schedule,omitempty" swaggerignore:"true" table:"expand"` diff --git a/service/dealtracker/dealtracker.go b/service/dealtracker/dealtracker.go index 36a2714c..c878d27f 100644 --- a/service/dealtracker/dealtracker.go +++ b/service/dealtracker/dealtracker.go @@ -556,6 +556,7 @@ func (d *DealTracker) runOnce(ctx context.Context) error { return db.Create(&model.Deal{ DealID: &dealID, State: newState, + DealType: model.DealTypeMarket, // Legacy market deal (f05) ClientID: deal.Proposal.Client, Provider: deal.Proposal.Provider, Label: deal.Proposal.Label, diff --git a/service/pdptracker/pdptracker.go b/service/pdptracker/pdptracker.go new file mode 100644 index 00000000..434ba6fa --- /dev/null +++ b/service/pdptracker/pdptracker.go @@ -0,0 +1,323 @@ +// Package pdptracker provides a service for tracking PDP (Proof of Data Possession) deals +// using the f41 actor on Filecoin. This is distinct from legacy f05 market deals. +// +// PDP deals use proof sets managed through the PDPVerifier contract, where data is verified +// through cryptographic challenges rather than the traditional sector sealing process. +// +// Note: This package requires the go-synapse library for full functionality. +// See: https://github.com/data-preservation-programs/go-synapse +package pdptracker + +import ( + "context" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/service/healthcheck" + "github.com/google/uuid" + "github.com/gotidy/ptr" + "github.com/ipfs/go-log/v2" + "gorm.io/gorm" +) + +var ErrAlreadyRunning = errors.New("another PDP tracker worker already running") + +const ( + healthRegisterRetryInterval = time.Minute + cleanupTimeout = 5 * time.Second +) + +var Logger = log.Logger("pdptracker") + +// ProofSetInfo contains information about a PDP proof set retrieved from on-chain state +type ProofSetInfo struct { + ProofSetID uint64 + ClientAddress string // f4 address of the client + ProviderAddress string // Provider/record keeper address + IsLive bool // Whether the proof set is actively being challenged + NextChallengeEpoch int32 // Next epoch when a challenge is due + PieceCIDs []string +} + +// PDPClient is the interface for interacting with PDP on-chain state. +// This will be implemented using the go-synapse library once it's available. +type PDPClient interface { + // GetProofSetsForClient returns all proof sets associated with a client address + GetProofSetsForClient(ctx context.Context, clientAddress string) ([]ProofSetInfo, error) + // GetProofSetInfo returns detailed information about a specific proof set + GetProofSetInfo(ctx context.Context, proofSetID uint64) (*ProofSetInfo, error) + // IsProofSetLive checks if a proof set is actively being challenged + IsProofSetLive(ctx context.Context, proofSetID uint64) (bool, error) + // GetNextChallengeEpoch returns the next challenge epoch for a proof set + GetNextChallengeEpoch(ctx context.Context, proofSetID uint64) (int32, error) +} + +// PDPTracker tracks PDP deals (f41 actor) on the Filecoin network. +// It monitors proof sets and updates deal status based on on-chain state. +type PDPTracker struct { + workerID uuid.UUID + dbNoContext *gorm.DB + interval time.Duration + pdpClient PDPClient + rpcURL string + once bool +} + +// NewPDPTracker creates a new PDP deal tracker. +// +// Parameters: +// - db: Database connection for storing deal information +// - interval: How often to check for updates +// - rpcURL: Filecoin RPC endpoint URL +// - pdpClient: Client for interacting with PDP contracts (can be nil for stub mode) +// - once: If true, run only once instead of continuously +func NewPDPTracker( + db *gorm.DB, + interval time.Duration, + rpcURL string, + pdpClient PDPClient, + once bool, +) PDPTracker { + return PDPTracker{ + workerID: uuid.New(), + dbNoContext: db, + interval: interval, + rpcURL: rpcURL, + pdpClient: pdpClient, + once: once, + } +} + +func (*PDPTracker) Name() string { + return "PDPTracker" +} + +// Start begins the PDP tracker service. +func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { + if p.pdpClient == nil { + Logger.Warn("PDP client not configured - PDP tracking will be disabled until go-synapse is integrated") + if exitErr != nil { + exitErr <- nil + } + return nil + } + + var regTimer *time.Timer + for { + alreadyRunning, err := healthcheck.Register(ctx, p.dbNoContext, p.workerID, model.PDPTracker, false) + if err != nil { + return errors.WithStack(err) + } + if !alreadyRunning { + break + } + + Logger.Warnw("another PDP tracker worker already running") + if p.once { + return ErrAlreadyRunning + } + Logger.Warn("retrying in 1 minute") + if regTimer == nil { + regTimer = time.NewTimer(healthRegisterRetryInterval) + defer regTimer.Stop() + } else { + regTimer.Reset(healthRegisterRetryInterval) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-regTimer.C: + } + } + + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + + healthcheckDone := make(chan struct{}) + go func() { + defer close(healthcheckDone) + healthcheck.StartReportHealth(ctx, p.dbNoContext, p.workerID, model.PDPTracker) + Logger.Info("PDP tracker health report stopped") + }() + + go func() { + var timer *time.Timer + var runErr error + for { + runErr = p.runOnce(ctx) + if runErr != nil { + if ctx.Err() != nil { + if errors.Is(runErr, context.Canceled) { + runErr = nil + } + Logger.Info("PDP tracker run stopped") + break + } + Logger.Errorw("failed to run PDP tracker once", "error", runErr) + } + if p.once { + Logger.Info("PDP tracker run once done") + break + } + if timer == nil { + timer = time.NewTimer(p.interval) + defer timer.Stop() + } else { + timer.Reset(p.interval) + } + + var stopped bool + select { + case <-ctx.Done(): + stopped = true + case <-timer.C: + } + if stopped { + Logger.Info("PDP tracker run stopped") + break + } + } + + cancel() + + ctx2, cancel2 := context.WithTimeout(context.Background(), cleanupTimeout) + defer cancel2() + //nolint:contextcheck + err := p.cleanup(ctx2) + if err != nil { + Logger.Errorw("failed to cleanup PDP tracker", "error", err) + } else { + Logger.Info("PDP tracker cleanup done") + } + + <-healthcheckDone + + if exitErr != nil { + exitErr <- runErr + } + }() + + return nil +} + +func (p *PDPTracker) cleanup(ctx context.Context) error { + return database.DoRetry(ctx, func() error { + return p.dbNoContext.WithContext(ctx).Where("id = ?", p.workerID).Delete(&model.Worker{}).Error + }) +} + +// runOnce performs a single cycle of PDP deal tracking. +// It queries wallets, fetches their PDP proof sets, and updates deal status. +func (p *PDPTracker) runOnce(ctx context.Context) error { + if p.pdpClient == nil { + return nil + } + + db := p.dbNoContext.WithContext(ctx) + + // Get all wallets to track + var wallets []model.Wallet + err := db.Find(&wallets).Error + if err != nil { + return errors.Wrap(err, "failed to get wallets from database") + } + + now := time.Now() + var updated, inserted int64 + + for _, wallet := range wallets { + Logger.Infof("tracking PDP deals for wallet %s", wallet.ID) + + // Get proof sets for this wallet + proofSets, err := p.pdpClient.GetProofSetsForClient(ctx, wallet.Address) + if err != nil { + Logger.Warnw("failed to get proof sets for wallet", "wallet", wallet.ID, "error", err) + continue + } + + for _, ps := range proofSets { + for _, pieceCID := range ps.PieceCIDs { + // Check if we already have this deal tracked + var existingDeal model.Deal + err := db.Where("proof_set_id = ? AND piece_cid = ? AND deal_type = ?", + ps.ProofSetID, pieceCID, model.DealTypePDP).First(&existingDeal).Error + + if err == nil { + // Deal exists, check if status changed + needsUpdate := false + updates := map[string]any{} + + if existingDeal.ProofSetLive == nil || *existingDeal.ProofSetLive != ps.IsLive { + updates["proof_set_live"] = ps.IsLive + needsUpdate = true + } + if existingDeal.NextChallengeEpoch == nil || *existingDeal.NextChallengeEpoch != ps.NextChallengeEpoch { + updates["next_challenge_epoch"] = ps.NextChallengeEpoch + needsUpdate = true + } + + // Update state based on proof set status + newState := p.getPDPDealState(ps) + if existingDeal.State != newState { + updates["state"] = newState + needsUpdate = true + } + + if needsUpdate { + updates["last_verified_at"] = now + err = database.DoRetry(ctx, func() error { + return db.Model(&model.Deal{}).Where("id = ?", existingDeal.ID).Updates(updates).Error + }) + if err != nil { + Logger.Errorw("failed to update PDP deal", "dealID", existingDeal.ID, "error", err) + continue + } + Logger.Infow("PDP deal updated", "dealID", existingDeal.ID, "proofSetID", ps.ProofSetID) + updated++ + } + } else if errors.Is(err, gorm.ErrRecordNotFound) { + // New PDP deal, insert it + newState := p.getPDPDealState(ps) + newDeal := model.Deal{ + DealType: model.DealTypePDP, + State: newState, + ClientID: wallet.ID, + Provider: ps.ProviderAddress, + PieceCID: model.CID{}, // TODO: Parse CID from string + ProofSetID: ptr.Of(ps.ProofSetID), + ProofSetLive: ptr.Of(ps.IsLive), + NextChallengeEpoch: ptr.Of(ps.NextChallengeEpoch), + LastVerifiedAt: ptr.Of(now), + } + + err = database.DoRetry(ctx, func() error { + return db.Create(&newDeal).Error + }) + if err != nil { + Logger.Errorw("failed to insert PDP deal", "proofSetID", ps.ProofSetID, "error", err) + continue + } + Logger.Infow("PDP deal inserted", "proofSetID", ps.ProofSetID, "state", newState) + inserted++ + } else { + Logger.Errorw("failed to query existing PDP deal", "error", err) + } + } + } + } + + Logger.Infof("PDP tracker: updated %d deals, inserted %d deals", updated, inserted) + return nil +} + +// getPDPDealState determines the deal state based on proof set status +func (p *PDPTracker) getPDPDealState(ps ProofSetInfo) model.DealState { + if ps.IsLive { + return model.DealActive + } + // If not live, it might be proposed (waiting for first challenge) or expired + // This logic may need refinement based on actual PDP contract semantics + return model.DealPublished +}