From e88d69945c754a49cc5a7fe598deea93f44f07e1 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Fri, 16 Jan 2026 14:17:52 +0100 Subject: [PATCH 1/3] Refactor job handling to introduce context usage and modular BaseJob components: `JobBase`, `JobRedis`, and `BaseJobDB`. --- cdb/db.go | 28 +++ worker/base_job.go | 116 ++++++++++ worker/base_job_db.go | 208 ++++++++++++++++++ worker/base_job_ev.go | 11 + worker/base_job_redis.go | 74 +++++++ worker/dashboard.go | 11 +- worker/job.go | 250 ---------------------- worker/job_feed_daemon_ping.go | 50 +++-- worker/job_feed_daemon_status.go | 238 ++++++-------------- worker/job_feed_instance_resource_info.go | 36 ++-- worker/job_feed_instance_status.go | 52 +++-- worker/job_feed_node_disk.go | 56 ++--- worker/job_feed_object_config.go | 32 +-- worker/job_feed_system.go | 110 +++++----- worker/worker.go | 19 +- 15 files changed, 703 insertions(+), 588 deletions(-) create mode 100644 worker/base_job.go create mode 100644 worker/base_job_db.go create mode 100644 worker/base_job_ev.go create mode 100644 worker/base_job_redis.go delete mode 100644 worker/job.go diff --git a/cdb/db.go b/cdb/db.go index f0ac955..30eeb1c 100644 --- a/cdb/db.go +++ b/cdb/db.go @@ -23,6 +23,9 @@ type ( DBLck *DBLocker Session *Session + + dbPool *sql.DB + hasTx bool } // DBLocker combines a database connection and a sync.Locker @@ -57,6 +60,31 @@ func InitDbLocker(db *sql.DB) *DBLocker { return dbLocker } +func New(dbPool *sql.DB) *DB { + return &DB{DB: dbPool, DBLck: InitDbLocker(dbPool), dbPool: dbPool} +} + +func (odb *DB) CreateTx(ctx context.Context, opts *sql.TxOptions) error { + if odb.hasTx { + return fmt.Errorf("already in a transaction") + } + if tx, err := odb.dbPool.BeginTx(ctx, opts); err != nil { + return err + } else { + odb.DB = tx + odb.hasTx = true + return nil + } +} + +func (odb *DB) CreateSession(ev eventPublisher) { + odb.Session = &Session{ + db: odb.DB, + ev: ev, + tables: make(map[string]struct{}), + } +} + func (oDb *DB) Commit() error { tx, ok := oDb.DB.(DBTxer) if !ok { diff --git a/worker/base_job.go b/worker/base_job.go new file mode 100644 index 0000000..c2e0359 --- /dev/null +++ b/worker/base_job.go @@ -0,0 +1,116 @@ +package worker + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/opensvc/oc3/cdb" +) + +type ( + JobBase struct { + name string + detail string + } + + operation struct { + desc string + do func() error + doCtx func(context.Context) error + + // blocking stops the operation chain on operation error + blocking bool + + // condition skips operation if condition returns false + condition func() bool + } + + LogResulter interface { + LogResult() + } + + Operationer interface { + Operations() []operation + } + + DBGetter interface { + DB() cdb.DBOperater + } +) + +func RunJob(ctx context.Context, j JobRunner) error { + name := j.Name() + detail := j.Detail() + defer logDurationInfo(fmt.Sprintf("%s %s", name, detail), time.Now()) + slog.Info(fmt.Sprintf("%s starting %s", name, detail)) + + ops := j.Operations() + + err := runOps(ctx, ops...) + if err != nil { + if tx, ok := j.(cdb.DBTxer); ok { + slog.Debug(fmt.Sprintf("%s rollbacking on error %s", name, detail)) + if err := tx.Rollback(); err != nil { + slog.Error(fmt.Sprintf("%s rollback on error failed %s: %s", name, detail, err)) + } + } + return err + } else if tx, ok := j.(cdb.DBTxer); ok { + slog.Debug(fmt.Sprintf("%s commiting %s", name, detail)) + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit: %w", err) + } + } + if r, ok := j.(LogResulter); ok { + r.LogResult() + } + slog.Info(fmt.Sprintf("%s done %s", name, detail)) + return nil +} + +func (j *JobBase) Name() string { + return j.name +} + +func (j *JobBase) Detail() string { + return j.detail +} + +func runOps(ctx context.Context, ops ...operation) error { + for _, op := range ops { + var err error + if op.condition != nil && !op.condition() { + continue + } + begin := time.Now() + if op.doCtx != nil { + err = op.doCtx(ctx) + } else if op.do != nil { + err = op.do() + } + duration := time.Since(begin) + if err != nil { + operationDuration. + With(prometheus.Labels{"desc": op.desc, "status": operationStatusFailed}). + Observe(duration.Seconds()) + if op.blocking { + return err + } + slog.Warn("%s: non blocking error: %s", op.desc, err) + continue + } + operationDuration. + With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}). + Observe(duration.Seconds()) + slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", op.desc, duration)) + } + return nil +} + +func logDurationInfo(s string, begin time.Time) { + slog.Info(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin))) +} diff --git a/worker/base_job_db.go b/worker/base_job_db.go new file mode 100644 index 0000000..a342d46 --- /dev/null +++ b/worker/base_job_db.go @@ -0,0 +1,208 @@ +package worker + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" + + "github.com/opensvc/oc3/cdb" +) + +type ( + JobDB struct { + dbPool *sql.DB + + // db is the generic DB operator + db cdb.DBOperater + + // oDb is the DB collector helper + oDb *cdb.DB + + now time.Time + } +) + +var ( + _ cdb.DBTxer = &JobDB{} +) + +func (j *JobDB) PrepareDB(ctx context.Context, dbPool *sql.DB, ev EventPublisher, withTx bool) error { + if j == nil { + return fmt.Errorf("nil JobDB") + } + odb := cdb.New(dbPool) + if withTx { + if err := odb.CreateTx(ctx, nil); err != nil { + return err + } + } + odb.CreateSession(ev) + + j.oDb = odb + + j.db = odb.DB + return nil +} + +func (j *JobDB) Commit() error { + return j.oDb.Commit() +} + +func (j *JobDB) Rollback() error { + return j.oDb.Rollback() +} + +func (j *JobDB) DB() cdb.DBOperater { + return j.db +} + +func (j *JobDB) dbNow(ctx context.Context) (err error) { + rows, err := j.db.QueryContext(ctx, "SELECT NOW()") + if err != nil { + return err + } + if rows == nil { + return fmt.Errorf("no result rows for SELECT NOW()") + } + defer rows.Close() + if !rows.Next() { + return fmt.Errorf("no result rows next for SELECT NOW()") + } + if err := rows.Scan(&j.now); err != nil { + return err + } + return nil +} + +func (d *JobDB) dbUpdateInstance(ctx context.Context, iStatus *instanceData, objID string, nodeID string, objectName string, nodename string, obj *cdb.DBObject, instanceMonitorStates map[string]bool, node *cdb.DBNode, beginInstance time.Time, changes map[string]struct{}) error { + iStatus.SvcID = objID + iStatus.NodeID = nodeID + _, isChanged := changes[objectName+"@"+nodename] + if !isChanged && obj.AvailStatus != "undef" { + slog.Debug(fmt.Sprintf("ping instance %s@%s", objectName, nodename)) + changes, err := d.oDb.InstancePing(ctx, objID, nodeID) + if err != nil { + return fmt.Errorf("dbUpdateInstances can't ping instance %s@%s: %w", objectName, nodename, err) + } else if changes { + // the instance already existed, and the updated tstamp has been refreshed + // skip the inserts/updates + return nil + } + } + instanceMonitorStates[iStatus.MonSmonStatus] = true + if iStatus.encap == nil { + subNodeID, _, _, err := d.oDb.TranslateEncapNodename(ctx, objID, nodeID) + if err != nil { + return err + } + if subNodeID != "" && subNodeID != nodeID { + slog.Debug(fmt.Sprintf("dbUpdateInstances skip for %s@%s subNodeID:%s vs nodeID: %subNodeID", objectName, nodename, subNodeID, nodeID)) + return nil + } + if iStatus.resources == nil { + // scaler or wrapper, for example + if err := d.oDb.InstanceDeleteStatus(ctx, objID, nodeID); err != nil { + return fmt.Errorf("dbUpdateInstances delete status %s@%s: %w", objID, nodeID, err) + } + if err := d.oDb.InstanceResourcesDelete(ctx, objID, nodeID); err != nil { + return fmt.Errorf("dbUpdateInstances delete resources %s@%s: %w", objID, nodeID, err) + } + } else { + if err := d.instanceStatusUpdate(ctx, objectName, nodename, iStatus); err != nil { + return fmt.Errorf("dbUpdateInstances update status %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) + } + if err := d.instanceResourceUpdate(ctx, objectName, nodename, iStatus); err != nil { + return fmt.Errorf("dbUpdateInstances update resource %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) + } + slog.Debug(fmt.Sprintf("dbUpdateInstances deleting obsolete resources %s@%s", objectName, nodename)) + if err := d.oDb.InstanceResourcesDeleteObsolete(ctx, objID, nodeID, d.now); err != nil { + return fmt.Errorf("dbUpdateInstances delete obsolete resources %s@%s: %w", objID, nodeID, err) + } + } + } else { + if iStatus.resources == nil { + // scaler or wrapper, for example + if err := d.oDb.InstanceDeleteStatus(ctx, objID, nodeID); err != nil { + return fmt.Errorf("dbUpdateInstances delete status %s@%s: %w", objID, nodeID, err) + } + if err := d.oDb.InstanceResourcesDelete(ctx, objID, nodeID); err != nil { + return fmt.Errorf("dbUpdateInstances delete resources %s@%s: %w", objID, nodeID, err) + } + } else { + for _, containerStatus := range iStatus.Containers() { + slog.Debug(fmt.Sprintf("dbUpdateInstances from container status %s@%s monVmName: %s monVmType: %s", objectName, nodename, containerStatus.MonVmName, containerStatus.MonVmType)) + if containerStatus == nil { + continue + } + if containerStatus.fromOutsideStatus == "up" { + slog.Debug(fmt.Sprintf("dbUpdateInstances nodeContainerUpdateFromParentNode %s@%s encap hostname %s", + objID, nodeID, containerStatus.MonVmName)) + if err := d.oDb.NodeContainerUpdateFromParentNode(ctx, containerStatus.MonVmName, obj.App, node); err != nil { + return fmt.Errorf("dbUpdateInstances nodeContainerUpdateFromParentNode %s@%s encap hostname %s: %w", + objID, nodeID, containerStatus.MonVmName, err) + } + } + + if err := d.instanceStatusUpdate(ctx, objID, nodeID, containerStatus); err != nil { + return fmt.Errorf("dbUpdateInstances update container %s %s@%s (%s@%s): %w", + containerStatus.MonVmName, objID, nodeID, objectName, nodename, err) + } + if err := d.instanceResourceUpdate(ctx, objectName, nodename, iStatus); err != nil { + return fmt.Errorf("dbUpdateInstances update resource %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) + } + } + slog.Debug(fmt.Sprintf("dbUpdateInstances deleting obsolete container resources %s@%s", objectName, nodename)) + if err := d.oDb.InstanceResourcesDeleteObsolete(ctx, objID, nodeID, d.now); err != nil { + return fmt.Errorf("dbUpdateInstances delete obsolete container resources %s@%s: %w", objID, nodeID, err) + } + } + } + if err := d.oDb.DashboardInstanceFrozenUpdate(ctx, objID, nodeID, obj.Env, iStatus.MonFrozen > 0); err != nil { + return fmt.Errorf("dbUpdateInstances update dashboard instance frozen %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) + } + if err := d.oDb.DashboardDeleteInstanceNotUpdated(ctx, objID, nodeID); err != nil { + return fmt.Errorf("dbUpdateInstances update dashboard instance not updated %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) + } + // TODO: verify if we need a placement non optimal alert for object/instance + // om2 has: monitor.services.''.placement = non-optimal + // om3 has: cluster.object..placement_state = non-optimal + // cluster.node..instance..monitor.is_ha_leader + // cluster.node..instance..monitor.is_leader + // collector v2 calls update_dash_service_not_on_primary (broken since no DEFAULT.autostart_node values) + + slog.Debug(fmt.Sprintf("STAT: dbUpdateInstances instance duration %s@%s %s", objectName, nodename, time.Since(beginInstance))) + return nil +} + +func (d *JobDB) pushFromTableChanges(ctx context.Context) error { + return d.oDb.Session.NotifyChanges(ctx) +} + +func (d *JobDB) instanceStatusUpdate(ctx context.Context, objName string, nodename string, iStatus *instanceData) error { + slog.Debug(fmt.Sprintf("updating instance status %s@%s (%s@%s)", objName, nodename, iStatus.SvcID, iStatus.NodeID)) + if err := d.oDb.InstanceStatusUpdate(ctx, &iStatus.DBInstanceStatus); err != nil { + return fmt.Errorf("update instance status: %w", err) + } + slog.Debug(fmt.Sprintf("instanceStatusUpdate updating status log %s@%s (%s@%s)", objName, nodename, iStatus.SvcID, iStatus.NodeID)) + err := d.oDb.InstanceStatusLogUpdate(ctx, &iStatus.DBInstanceStatus) + if err != nil { + return fmt.Errorf("update instance status log: %w", err) + } + return nil +} + +func (d *JobDB) instanceResourceUpdate(ctx context.Context, objName string, nodename string, iStatus *instanceData) error { + for _, res := range iStatus.InstanceResources() { + slog.Debug(fmt.Sprintf("updating instance resource %s@%s %s (%s@%s)", objName, nodename, res.RID, iStatus.SvcID, iStatus.NodeID)) + if err := d.oDb.InstanceResourceUpdate(ctx, res); err != nil { + return fmt.Errorf("update resource %s: %w", res.RID, err) + } + slog.Debug(fmt.Sprintf("updating instance resource log %s@%s %s (%s@%s)", objName, nodename, res.RID, iStatus.SvcID, iStatus.NodeID)) + if err := d.oDb.InstanceResourceLogUpdate(ctx, res); err != nil { + return fmt.Errorf("update resource log %s: %w", res.RID, err) + } + } + return nil +} diff --git a/worker/base_job_ev.go b/worker/base_job_ev.go new file mode 100644 index 0000000..27594cf --- /dev/null +++ b/worker/base_job_ev.go @@ -0,0 +1,11 @@ +package worker + +type ( + JobEv struct { + ev EventPublisher + } +) + +func (j *JobEv) SetEv(ev EventPublisher) { + j.ev = ev +} diff --git a/worker/base_job_redis.go b/worker/base_job_redis.go new file mode 100644 index 0000000..93f3e8d --- /dev/null +++ b/worker/base_job_redis.go @@ -0,0 +1,74 @@ +package worker + +import ( + "context" + "fmt" + "strings" + + "github.com/go-redis/redis/v8" + + "github.com/opensvc/oc3/cachekeys" + "github.com/opensvc/oc3/cdb" +) + +type ( + JobRedis struct { + redis *redis.Client + + // cachePendingH is the cache hash used by BaseJob.dropPending: + // HDEL + cachePendingH string + + // cachePendingIDX is the cache id used by BaseJob.dropPending: + // HDEL + cachePendingIDX string + } +) + +func (j *JobRedis) SetRedis(r *redis.Client) { + j.redis = r +} + +// populateFeedObjectConfigForClusterIDH HSET FeedObjectConfigForClusterIDH with the names of objects +// without a config or HDEL FeedObjectConfigForClusterIDH if there are no missing configs. +func (d *JobRedis) populateFeedObjectConfigForClusterIDH(ctx context.Context, clusterID string, byObjectID map[string]*cdb.DBObject) ([]string, error) { + needConfig := make(map[string]struct{}) + for _, obj := range byObjectID { + if obj.NullConfig { + objName := obj.Svcname + // TODO: import om3 naming ? + if strings.Contains(objName, "/svc/") || + strings.Contains(objName, "/vol/") || + strings.HasPrefix(objName, "svc/") || + strings.HasPrefix(objName, "vol/") || + !strings.Contains(objName, "/") { + needConfig[objName] = struct{}{} + } + } + } + + keyName := cachekeys.FeedObjectConfigForClusterIDH + + if len(needConfig) > 0 { + l := make([]string, 0, len(needConfig)) + for k := range needConfig { + l = append(l, k) + } + if err := d.redis.HSet(ctx, keyName, clusterID, strings.Join(l, " ")).Err(); err != nil { + return l, fmt.Errorf("populateFeedObjectConfigForClusterIDH: HSet %s %s: %w", keyName, clusterID, err) + } + return l, nil + } else { + if err := d.redis.HDel(ctx, keyName, clusterID).Err(); err != nil { + return nil, fmt.Errorf("populateFeedObjectConfigForClusterIDH: HDEL %s %s: %w", keyName, clusterID, err) + } + } + return nil, nil +} + +func (j *JobRedis) dropPending(ctx context.Context) error { + if err := j.redis.HDel(ctx, j.cachePendingH, j.cachePendingIDX).Err(); err != nil { + return fmt.Errorf("dropPending: HDEL %s %s: %w", j.cachePendingH, j.cachePendingIDX, err) + } + return nil +} diff --git a/worker/dashboard.go b/worker/dashboard.go index 982ed37..cf0a191 100644 --- a/worker/dashboard.go +++ b/worker/dashboard.go @@ -1,6 +1,7 @@ package worker import ( + "context" "fmt" "time" @@ -44,7 +45,7 @@ func severityFromEnv(dashType int, objEnv string) int { } } -func (d *jobFeedDaemonStatus) updateDashboardObject(obj *cdb.DBObject, doDelete bool, dash dashboarder) error { +func (d *jobFeedDaemonStatus) updateDashboardObject(ctx context.Context, obj *cdb.DBObject, doDelete bool, dash dashboarder) error { objID := obj.SvcID fmtErr := func(err error) error { if err != nil { @@ -54,16 +55,16 @@ func (d *jobFeedDaemonStatus) updateDashboardObject(obj *cdb.DBObject, doDelete } if doDelete { - return fmtErr(d.oDb.DashboardDeleteObjectWithType(d.ctx, objID, dash.Type())) + return fmtErr(d.oDb.DashboardDeleteObjectWithType(ctx, objID, dash.Type())) } - inAckPeriod, err := d.oDb.ObjectInAckUnavailabilityPeriod(d.ctx, objID) + inAckPeriod, err := d.oDb.ObjectInAckUnavailabilityPeriod(ctx, objID) if err != nil { return err } dashType := dash.Type() if inAckPeriod { - return fmtErr(d.oDb.DashboardDeleteObjectWithType(d.ctx, objID, dashType)) + return fmtErr(d.oDb.DashboardDeleteObjectWithType(ctx, objID, dashType)) } else { now := time.Now() dash := cdb.Dashboard{ @@ -76,6 +77,6 @@ func (d *jobFeedDaemonStatus) updateDashboardObject(obj *cdb.DBObject, doDelete Created: now, Updated: now, } - return fmtErr(d.oDb.DashboardUpdateObject(d.ctx, &dash)) + return fmtErr(d.oDb.DashboardUpdateObject(ctx, &dash)) } } diff --git a/worker/job.go b/worker/job.go deleted file mode 100644 index 8b1ff74..0000000 --- a/worker/job.go +++ /dev/null @@ -1,250 +0,0 @@ -package worker - -import ( - "context" - "database/sql" - "fmt" - "log/slog" - "strings" - "time" - - redis "github.com/go-redis/redis/v8" - "github.com/prometheus/client_golang/prometheus" - - "github.com/opensvc/oc3/cachekeys" - "github.com/opensvc/oc3/cdb" -) - -type ( - // BaseJob is a base struct to compose jobs. - BaseJob struct { - ctx context.Context - redis *redis.Client - db cdb.DBOperater - oDb *cdb.DB - ev EventPublisher - - name string - detail string - now time.Time - - // cachePendingH is the cache hash used by BaseJob.dropPending: - // HDEL - cachePendingH string - - // cachePendingIDX is the cache id used by BaseJob.dropPending: - // HDEL - cachePendingIDX string - } - - operation struct { - desc string - do func() error - - // blocking stops the operation chain on operation error - blocking bool - - // condition skips operation if condition returns false - condition func() bool - } - - LogResulter interface { - LogResult() - } - - Operationer interface { - Operations() []operation - } - - DBGetter interface { - DB() cdb.DBOperater - } -) - -func RunJob(j JobRunner) error { - name := j.Name() - detail := j.Detail() - defer logDurationInfo(fmt.Sprintf("%s %s", name, detail), time.Now()) - slog.Info(fmt.Sprintf("%s starting %s", name, detail)) - - ops := j.Operations() - - err := runOps(ops...) - if err != nil { - if tx, ok := j.DB().(cdb.DBTxer); ok { - slog.Debug(fmt.Sprintf("%s rollbacking on error %s", name, detail)) - if err := tx.Rollback(); err != nil { - slog.Error(fmt.Sprintf("%s rollback on error failed %s: %s", name, detail, err)) - } - } - return err - } else if tx, ok := j.DB().(cdb.DBTxer); ok { - slog.Debug(fmt.Sprintf("%s commiting %s", name, detail)) - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit: %w", err) - } - } - if r, ok := j.(LogResulter); ok { - r.LogResult() - } - slog.Info(fmt.Sprintf("%s done %s", name, detail)) - return nil -} - -func (j *BaseJob) PrepareDB(ctx context.Context, db *sql.DB, ev EventPublisher, withTx bool) error { - switch withTx { - case true: - if tx, err := db.BeginTx(ctx, nil); err != nil { - return err - } else { - j.db = tx - j.oDb = &cdb.DB{DB: tx, Session: cdb.NewSession(tx, ev)} - } - case false: - j.db = db - j.oDb = &cdb.DB{DB: db, Session: cdb.NewSession(db, ev)} - } - j.oDb.DBLck = cdb.InitDbLocker(db) - j.ctx = ctx - j.ev = ev - return nil -} - -func (j *BaseJob) DB() cdb.DBOperater { - return j.db -} - -func (j *BaseJob) SetRedis(r *redis.Client) { - j.redis = r -} - -func (j *BaseJob) SetEv(ev EventPublisher) { - j.ev = ev -} - -func (j *BaseJob) Name() string { - return j.name -} - -func (j *BaseJob) Detail() string { - return j.detail -} - -func (j *BaseJob) dbNow() (err error) { - rows, err := j.db.QueryContext(j.ctx, "SELECT NOW()") - if err != nil { - return err - } - if rows == nil { - return fmt.Errorf("no result rows for SELECT NOW()") - } - defer rows.Close() - if !rows.Next() { - return fmt.Errorf("no result rows next for SELECT NOW()") - } - if err := rows.Scan(&j.now); err != nil { - return err - } - return nil -} - -func runOps(ops ...operation) error { - for _, op := range ops { - if op.condition != nil && !op.condition() { - continue - } - begin := time.Now() - err := op.do() - duration := time.Since(begin) - if err != nil { - operationDuration. - With(prometheus.Labels{"desc": op.desc, "status": operationStatusFailed}). - Observe(duration.Seconds()) - if op.blocking { - return err - } - slog.Warn("%s: non blocking error: %s", op.desc, err) - continue - } - operationDuration. - With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}). - Observe(duration.Seconds()) - slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", op.desc, duration)) - } - return nil -} - -func (j *BaseJob) dropPending() error { - if err := j.redis.HDel(j.ctx, j.cachePendingH, j.cachePendingIDX).Err(); err != nil { - return fmt.Errorf("dropPending: HDEL %s %s: %w", j.cachePendingH, j.cachePendingIDX, err) - } - return nil -} - -func (d *BaseJob) pushFromTableChanges() error { - return d.oDb.Session.NotifyChanges(d.ctx) -} - -// populateFeedObjectConfigForClusterIDH HSET FeedObjectConfigForClusterIDH with the names of objects -// without config or HDEL FeedObjectConfigForClusterIDH if there are no missing configs. -func (d *BaseJob) populateFeedObjectConfigForClusterIDH(clusterID string, byObjectID map[string]*cdb.DBObject) ([]string, error) { - needConfig := make(map[string]struct{}) - for _, obj := range byObjectID { - if obj.NullConfig { - objName := obj.Svcname - // TODO: import om3 naming ? - if strings.Contains(objName, "/svc/") || - strings.Contains(objName, "/vol/") || - strings.HasPrefix(objName, "svc/") || - strings.HasPrefix(objName, "vol/") || - !strings.Contains(objName, "/") { - needConfig[objName] = struct{}{} - } - } - } - - keyName := cachekeys.FeedObjectConfigForClusterIDH - - if len(needConfig) > 0 { - l := make([]string, 0, len(needConfig)) - for k := range needConfig { - l = append(l, k) - } - if err := d.redis.HSet(d.ctx, keyName, clusterID, strings.Join(l, " ")).Err(); err != nil { - return l, fmt.Errorf("populateFeedObjectConfigForClusterIDH: HSet %s %s: %w", keyName, clusterID, err) - } - return l, nil - } else { - if err := d.redis.HDel(d.ctx, keyName, clusterID).Err(); err != nil { - return nil, fmt.Errorf("populateFeedObjectConfigForClusterIDH: HDEL %s %s: %w", keyName, clusterID, err) - } - } - return nil, nil -} - -func (d *BaseJob) instanceStatusUpdate(objName string, nodename string, iStatus *instanceData) error { - slog.Debug(fmt.Sprintf("updating instance status %s@%s (%s@%s)", objName, nodename, iStatus.SvcID, iStatus.NodeID)) - if err := d.oDb.InstanceStatusUpdate(d.ctx, &iStatus.DBInstanceStatus); err != nil { - return fmt.Errorf("update instance status: %w", err) - } - slog.Debug(fmt.Sprintf("instanceStatusUpdate updating status log %s@%s (%s@%s)", objName, nodename, iStatus.SvcID, iStatus.NodeID)) - err := d.oDb.InstanceStatusLogUpdate(d.ctx, &iStatus.DBInstanceStatus) - if err != nil { - return fmt.Errorf("update instance status log: %w", err) - } - return nil -} - -func (d *BaseJob) instanceResourceUpdate(objName string, nodename string, iStatus *instanceData) error { - for _, res := range iStatus.InstanceResources() { - slog.Debug(fmt.Sprintf("updating instance resource %s@%s %s (%s@%s)", objName, nodename, res.RID, iStatus.SvcID, iStatus.NodeID)) - if err := d.oDb.InstanceResourceUpdate(d.ctx, res); err != nil { - return fmt.Errorf("update resource %s: %w", res.RID, err) - } - slog.Debug(fmt.Sprintf("updating instance resource log %s@%s %s (%s@%s)", objName, nodename, res.RID, iStatus.SvcID, iStatus.NodeID)) - if err := d.oDb.InstanceResourceLogUpdate(d.ctx, res); err != nil { - return fmt.Errorf("update resource log %s: %w", res.RID, err) - } - } - return nil -} diff --git a/worker/job_feed_daemon_ping.go b/worker/job_feed_daemon_ping.go index 79dc257..17de815 100644 --- a/worker/job_feed_daemon_ping.go +++ b/worker/job_feed_daemon_ping.go @@ -1,6 +1,7 @@ package worker import ( + "context" "encoding/json" "fmt" "log/slog" @@ -12,7 +13,9 @@ import ( type ( jobFeedDaemonPing struct { - *BaseJob + JobBase + JobRedis + JobDB nodeID string clusterID string @@ -31,10 +34,11 @@ type ( func newDaemonPing(nodeID string) *jobFeedDaemonPing { return &jobFeedDaemonPing{ - BaseJob: &BaseJob{ + JobBase: JobBase{ name: "daemonPing", detail: "nodeID: " + nodeID, - + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedDaemonPingPendingH, cachePendingIDX: nodeID, }, @@ -50,20 +54,20 @@ func newDaemonPing(nodeID string) *jobFeedDaemonPing { func (d *jobFeedDaemonPing) Operations() []operation { return []operation{ - {desc: "daemonPing/dropPending", do: d.dropPending}, - {desc: "daemonPing/getData", do: d.getData}, - {desc: "daemonPing/dbFetchNodes", do: d.dbFetchNodes}, - {desc: "daemonPing/dbFetchObjects", do: d.dbFetchObjects}, - {desc: "daemonPing/dbPingInstances", do: d.dbPingInstances}, - {desc: "daemonPing/dbPingObjects", do: d.dbPingObjects}, - {desc: "daemonPing/cacheObjectsWithoutConfig", do: d.cacheObjectsWithoutConfig}, - {desc: "daemonPing/pushFromTableChanges", do: d.pushFromTableChanges}, + {desc: "daemonPing/dropPending", doCtx: d.dropPending}, + {desc: "daemonPing/getData", doCtx: d.getData}, + {desc: "daemonPing/dbFetchNodes", doCtx: d.dbFetchNodes}, + {desc: "daemonPing/dbFetchObjects", doCtx: d.dbFetchObjects}, + {desc: "daemonPing/dbPingInstances", doCtx: d.dbPingInstances}, + {desc: "daemonPing/dbPingObjects", doCtx: d.dbPingObjects}, + {desc: "daemonPing/cacheObjectsWithoutConfig", doCtx: d.cacheObjectsWithoutConfig}, + {desc: "daemonPing/pushFromTableChanges", doCtx: d.pushFromTableChanges}, } } -func (d *jobFeedDaemonPing) getData() error { +func (d *jobFeedDaemonPing) getData(ctx context.Context) error { var data api.PostFeedDaemonPing - if b, err := d.redis.HGet(d.ctx, cachekeys.FeedDaemonPingH, d.nodeID).Bytes(); err != nil { + if b, err := d.redis.HGet(ctx, cachekeys.FeedDaemonPingH, d.nodeID).Bytes(); err != nil { return fmt.Errorf("getData: HGET %s %s: %w", cachekeys.FeedDaemonPingH, d.nodeID, err) } else if err = json.Unmarshal(b, &data); err != nil { return fmt.Errorf("getData: unexpected data from %s %s: %w", cachekeys.FeedDaemonPingH, d.nodeID, err) @@ -81,11 +85,11 @@ func (d *jobFeedDaemonPing) getData() error { // dbFetchNodes fetch nodes (that are associated with caller node ID) from database // and sets d.byNodeID and d.clusterID. -func (d *jobFeedDaemonPing) dbFetchNodes() (err error) { +func (d *jobFeedDaemonPing) dbFetchNodes(ctx context.Context) (err error) { var ( dbNodes []*cdb.DBNode ) - if dbNodes, err = d.oDb.ClusterNodesFromNodeID(d.ctx, d.nodeID); err != nil { + if dbNodes, err = d.oDb.ClusterNodesFromNodeID(ctx, d.nodeID); err != nil { return fmt.Errorf("dbFetchNodes %s: %w", d.nodeID, err) } for _, n := range dbNodes { @@ -104,11 +108,11 @@ func (d *jobFeedDaemonPing) dbFetchNodes() (err error) { return nil } -func (d *jobFeedDaemonPing) dbFetchObjects() (err error) { +func (d *jobFeedDaemonPing) dbFetchObjects(ctx context.Context) (err error) { var ( objects []*cdb.DBObject ) - if objects, err = d.oDb.ObjectsFromClusterID(d.ctx, d.clusterID); err != nil { + if objects, err = d.oDb.ObjectsFromClusterID(ctx, d.clusterID); err != nil { return fmt.Errorf("dbFetchObjects query node %s (%s) clusterID: %s: %w", d.callerNode.Nodename, d.nodeID, d.clusterID, err) } @@ -124,9 +128,9 @@ func (d *jobFeedDaemonPing) dbFetchObjects() (err error) { } // dbPingInstances call oDb.InstancePingFromNodeID for all db fetched nodes -func (d *jobFeedDaemonPing) dbPingInstances() error { +func (d *jobFeedDaemonPing) dbPingInstances(ctx context.Context) error { for nodeID := range d.byNodeID { - if ok, err := d.oDb.InstancePingFromNodeID(d.ctx, nodeID); err != nil { + if ok, err := d.oDb.InstancePingFromNodeID(ctx, nodeID); err != nil { return fmt.Errorf("dbPingInstances: %w", err) } else if ok { continue @@ -136,12 +140,12 @@ func (d *jobFeedDaemonPing) dbPingInstances() error { } // dbPingObjects call oDb.objectPing for all db fetched objects -func (d *jobFeedDaemonPing) dbPingObjects() (err error) { +func (d *jobFeedDaemonPing) dbPingObjects(ctx context.Context) (err error) { for objectID, obj := range d.byObjectID { objectName := obj.Svcname if obj.AvailStatus != "undef" { slog.Debug(fmt.Sprintf("ping svc %s %s", objectName, objectID)) - if _, err := d.oDb.ObjectPing(d.ctx, objectID); err != nil { + if _, err := d.oDb.ObjectPing(ctx, objectID); err != nil { return fmt.Errorf("dbPingObjects can't ping object %s %s: %w", objectName, objectID, err) } } @@ -150,8 +154,8 @@ func (d *jobFeedDaemonPing) dbPingObjects() (err error) { } // cacheObjectsWithoutConfig populate FeedObjectConfigForClusterIDH with names of objects without config -func (d *jobFeedDaemonPing) cacheObjectsWithoutConfig() error { - objects, err := d.populateFeedObjectConfigForClusterIDH(d.clusterID, d.byObjectID) +func (d *jobFeedDaemonPing) cacheObjectsWithoutConfig(ctx context.Context) error { + objects, err := d.populateFeedObjectConfigForClusterIDH(ctx, d.clusterID, d.byObjectID) if len(objects) > 0 { slog.Info(fmt.Sprintf("daemonPing nodeID: %s need object config: %s", d.nodeID, objects)) } diff --git a/worker/job_feed_daemon_status.go b/worker/job_feed_daemon_status.go index b3589b4..ebd1afb 100644 --- a/worker/job_feed_daemon_status.go +++ b/worker/job_feed_daemon_status.go @@ -1,6 +1,7 @@ package worker import ( + "context" "encoding/json" "errors" "fmt" @@ -49,7 +50,9 @@ type ( } jobFeedDaemonStatus struct { - *BaseJob + JobBase + JobRedis + JobDB nodeID string clusterID string @@ -82,10 +85,11 @@ type ( func newDaemonStatus(nodeID string) *jobFeedDaemonStatus { return &jobFeedDaemonStatus{ - BaseJob: &BaseJob{ + JobBase: JobBase{ name: "daemonStatus", detail: "nodeID: " + nodeID, - + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedDaemonStatusPendingH, cachePendingIDX: nodeID, }, @@ -107,25 +111,25 @@ func newDaemonStatus(nodeID string) *jobFeedDaemonStatus { func (d *jobFeedDaemonStatus) Operations() []operation { return []operation{ - {desc: "daemonStatus/dropPending", do: d.dropPending}, - {desc: "daemonStatus/dbNow", do: d.dbNow}, - {desc: "daemonStatus/getChanges", do: d.getChanges}, - {desc: "daemonStatus/getData", do: d.getData}, - {desc: "daemonStatus/dbCheckClusterIDForNodeID", do: d.dbCheckClusterIDForNodeID}, - {desc: "daemonStatus/dbCheckClusters", do: d.dbCheckClusters}, - {desc: "daemonStatus/dbFindNodes", do: d.dbFindNodes}, - {desc: "daemonStatus/dataToNodeFrozen", do: d.dataToNodeFrozen}, + {desc: "daemonStatus/dropPending", doCtx: d.dropPending}, + {desc: "daemonStatus/dbNow", doCtx: d.dbNow}, + {desc: "daemonStatus/getChanges", doCtx: d.getChanges}, + {desc: "daemonStatus/getData", doCtx: d.getData}, + {desc: "daemonStatus/dbCheckClusterIDForNodeID", doCtx: d.dbCheckClusterIDForNodeID}, + {desc: "daemonStatus/dbCheckClusters", doCtx: d.dbCheckClusters}, + {desc: "daemonStatus/dbFindNodes", doCtx: d.dbFindNodes}, + {desc: "daemonStatus/dataToNodeFrozen", doCtx: d.dataToNodeFrozen}, {desc: "daemonStatus/dataToNodeHeartbeat", do: d.dataToNodeHeartbeat}, - {desc: "daemonStatus/heartbeatToDB", do: d.heartbeatToDB}, - {desc: "daemonStatus/dbFindServices", do: d.dbFindServices}, - {desc: "daemonStatus/dbCreateServices", do: d.dbCreateServices}, - {desc: "daemonStatus/dbFindInstances", do: d.dbFindInstances}, - {desc: "daemonStatus/dbUpdateServices", do: d.dbUpdateServices}, - {desc: "daemonStatus/dbUpdateInstances", do: d.dbUpdateInstances}, - {desc: "daemonStatus/dbPurgeInstances", do: d.dbPurgeInstances}, - {desc: "daemonStatus/dbPurgeServices", do: d.dbPurgeServices}, - {desc: "daemonStatus/cacheObjectsWithoutConfig", do: d.cacheObjectsWithoutConfig}, - {desc: "daemonStatus/pushFromTableChanges", do: d.pushFromTableChanges}, + {desc: "daemonStatus/heartbeatToDB", doCtx: d.heartbeatToDB}, + {desc: "daemonStatus/dbFindServices", doCtx: d.dbFindServices}, + {desc: "daemonStatus/dbCreateServices", doCtx: d.dbCreateServices}, + {desc: "daemonStatus/dbFindInstances", doCtx: d.dbFindInstances}, + {desc: "daemonStatus/dbUpdateServices", doCtx: d.dbUpdateServices}, + {desc: "daemonStatus/dbUpdateInstances", doCtx: d.dbUpdateInstances}, + {desc: "daemonStatus/dbPurgeInstances", doCtx: d.dbPurgeInstances}, + {desc: "daemonStatus/dbPurgeServices", doCtx: d.dbPurgeServices}, + {desc: "daemonStatus/cacheObjectsWithoutConfig", doCtx: d.cacheObjectsWithoutConfig}, + {desc: "daemonStatus/pushFromTableChanges", doCtx: d.pushFromTableChanges}, } } @@ -144,8 +148,8 @@ func (d *jobFeedDaemonStatus) LogResult() { } } -func (d *jobFeedDaemonStatus) getChanges() error { - s, err := d.redis.HGet(d.ctx, cachekeys.FeedDaemonStatusChangesH, d.nodeID).Result() +func (d *jobFeedDaemonStatus) getChanges(ctx context.Context) error { + s, err := d.redis.HGet(ctx, cachekeys.FeedDaemonStatusChangesH, d.nodeID).Result() if err == nil { // TODO: fix possible race: // worker iteration 1: pickup changes 'a' @@ -155,7 +159,7 @@ func (d *jobFeedDaemonStatus) getChanges() error { // worker iteration 1: delete changes the 'b' => 'b' change is lost // worker iteration 1: ... done // worker iteration 2: pickup changes: empty instead of expected 'b' - if err := d.redis.HDel(d.ctx, cachekeys.FeedDaemonStatusChangesH, d.nodeID).Err(); err != nil { + if err := d.redis.HDel(ctx, cachekeys.FeedDaemonStatusChangesH, d.nodeID).Err(); err != nil { return fmt.Errorf("getChanges: HDEL %s %s: %w", cachekeys.FeedDaemonStatusChangesH, d.nodeID, err) } } else if !errors.Is(err, redis.Nil) { @@ -168,12 +172,12 @@ func (d *jobFeedDaemonStatus) getChanges() error { return nil } -func (d *jobFeedDaemonStatus) getData() error { +func (d *jobFeedDaemonStatus) getData(ctx context.Context) error { var ( err error data map[string]any ) - if b, err := d.redis.HGet(d.ctx, cachekeys.FeedDaemonStatusH, d.nodeID).Bytes(); err != nil { + if b, err := d.redis.HGet(ctx, cachekeys.FeedDaemonStatusH, d.nodeID).Bytes(); err != nil { return fmt.Errorf("getData: HGET %s %s: %w", cachekeys.FeedDaemonStatusH, d.nodeID, err) } else if err = json.Unmarshal(b, &data); err != nil { return fmt.Errorf("getData: unexpected data from %s %s: %w", cachekeys.FeedDaemonStatusH, d.nodeID, err) @@ -199,8 +203,8 @@ func (d *jobFeedDaemonStatus) getData() error { return nil } -func (d *jobFeedDaemonStatus) dbCheckClusterIDForNodeID() error { - if ok, err := d.oDb.NodeUpdateClusterIDForNodeID(d.ctx, d.nodeID, d.clusterID); err != nil { +func (d *jobFeedDaemonStatus) dbCheckClusterIDForNodeID(ctx context.Context) error { + if ok, err := d.oDb.NodeUpdateClusterIDForNodeID(ctx, d.nodeID, d.clusterID); err != nil { return fmt.Errorf("dbCheckClusterIDForNodeID for %s (%s): %w", d.callerNode.Nodename, d.nodeID, err) } else if ok { slog.Info("dbCheckClusterIDForNodeID change cluster id value") @@ -208,14 +212,14 @@ func (d *jobFeedDaemonStatus) dbCheckClusterIDForNodeID() error { return nil } -func (d *jobFeedDaemonStatus) dbCheckClusters() error { - if err := d.oDb.UpdateClustersData(d.ctx, d.clusterName, d.clusterName, string(d.rawData)); err != nil { +func (d *jobFeedDaemonStatus) dbCheckClusters(ctx context.Context) error { + if err := d.oDb.UpdateClustersData(ctx, d.clusterName, d.clusterName, string(d.rawData)); err != nil { return fmt.Errorf("dbCheckClusters %s (%s): %w", d.nodeID, d.clusterID, err) } return nil } -func (d *jobFeedDaemonStatus) dbFindNodes() (err error) { +func (d *jobFeedDaemonStatus) dbFindNodes(ctx context.Context) (err error) { var ( nodes []string dbNodes []*cdb.DBNode @@ -223,7 +227,7 @@ func (d *jobFeedDaemonStatus) dbFindNodes() (err error) { // search caller node from its node_id: we can't trust yet search from // d.data.nodeNames() because initial push daemon status may omit caller node. - if callerNode, err := d.oDb.NodeByNodeID(d.ctx, d.nodeID); err != nil { + if callerNode, err := d.oDb.NodeByNodeID(ctx, d.nodeID); err != nil { return fmt.Errorf("dbFindNodes nodeByNodeID %s: %s", d.nodeID, err) } else if callerNode == nil { return fmt.Errorf("dbFindNodes can't find caller node %s", d.nodeID) @@ -243,7 +247,7 @@ func (d *jobFeedDaemonStatus) dbFindNodes() (err error) { if len(nodes) == 0 { return fmt.Errorf("dbFindNodes: empty nodes for %s", d.nodeID) } - if dbNodes, err = d.oDb.NodesFromClusterIDWithNodenames(d.ctx, d.clusterID, nodes); err != nil { + if dbNodes, err = d.oDb.NodesFromClusterIDWithNodenames(ctx, d.clusterID, nodes); err != nil { return fmt.Errorf("dbFindNodes %s [%s]: %w", nodes, d.nodeID, err) } for _, n := range dbNodes { @@ -267,7 +271,7 @@ func (d *jobFeedDaemonStatus) dbFindNodes() (err error) { return nil } -func (d *jobFeedDaemonStatus) dataToNodeFrozen() error { +func (d *jobFeedDaemonStatus) dataToNodeFrozen(ctx context.Context) error { for nodeID, dbNode := range d.byNodeID { nodename := dbNode.Nodename frozen, err := d.data.nodeFrozen(nodename) @@ -281,7 +285,7 @@ func (d *jobFeedDaemonStatus) dataToNodeFrozen() error { } if frozen != dbNode.Frozen { slog.Info(fmt.Sprintf("dataToNodeFrozen: updating node %s: %s frozen from %s -> %s", nodename, nodeID, dbNode.Frozen, frozen)) - if err := d.oDb.NodeUpdateFrozen(d.ctx, nodeID, frozen); err != nil { + if err := d.oDb.NodeUpdateFrozen(ctx, nodeID, frozen); err != nil { return fmt.Errorf("dataToNodeFrozen node %s (%s): %w", nodename, dbNode.NodeID, err) } } @@ -316,24 +320,24 @@ func (d *jobFeedDaemonStatus) dataToNodeHeartbeat() error { return nil } -func (d *jobFeedDaemonStatus) heartbeatToDB() error { +func (d *jobFeedDaemonStatus) heartbeatToDB(ctx context.Context) error { //now := time.Now() for _, hb := range d.heartbeats { slog.Debug(fmt.Sprintf("inserting: %s", hb)) - if err := d.oDb.HBUpdate(d.ctx, hb.DBHeartbeat); err != nil { + if err := d.oDb.HBUpdate(ctx, hb.DBHeartbeat); err != nil { return fmt.Errorf("1 heartbeatToDB hbUpdate %s: %w", hb, err) } - if err := d.oDb.HBLogUpdate(d.ctx, hb.DBHeartbeat); err != nil { + if err := d.oDb.HBLogUpdate(ctx, hb.DBHeartbeat); err != nil { return fmt.Errorf("heartbeatToDB hbLogUpdate %s: %w", hb, err) } } - if err := d.oDb.HBDeleteOutDatedByClusterID(d.ctx, d.clusterID, d.now); err != nil { + if err := d.oDb.HBDeleteOutDatedByClusterID(ctx, d.clusterID, d.now); err != nil { return fmt.Errorf("heartbeatToDB purge outdated %s: %w", d.clusterID, err) } return nil } -func (d *jobFeedDaemonStatus) dbFindServices() error { +func (d *jobFeedDaemonStatus) dbFindServices(ctx context.Context) error { var ( objects []*cdb.DBObject ) @@ -345,7 +349,7 @@ func (d *jobFeedDaemonStatus) dbFindServices() error { slog.Info(fmt.Sprintf("dbFindServices: no services for %s", d.nodeID)) return nil } - if objects, err = d.oDb.ObjectsFromClusterIDAndObjectNames(d.ctx, d.clusterID, objectNames); err != nil { + if objects, err = d.oDb.ObjectsFromClusterIDAndObjectNames(ctx, d.clusterID, objectNames); err != nil { return fmt.Errorf("dbFindServices query nodeID: %s clusterID: %s [%s]: %w", d.nodeID, d.clusterID, objectNames, err) } for _, o := range objects { @@ -356,7 +360,7 @@ func (d *jobFeedDaemonStatus) dbFindServices() error { return nil } -func (d *jobFeedDaemonStatus) dbFindInstances() error { +func (d *jobFeedDaemonStatus) dbFindInstances(ctx context.Context) error { var ( objectIDs = make([]string, 0) ) @@ -366,7 +370,7 @@ func (d *jobFeedDaemonStatus) dbFindInstances() error { for objectID := range d.byObjectID { objectIDs = append(objectIDs, objectID) } - instances, err := d.oDb.InstancesFromObjectIDs(d.ctx, objectIDs...) + instances, err := d.oDb.InstancesFromObjectIDs(ctx, objectIDs...) if err != nil { return fmt.Errorf("dbFindInstances: %w", err) } @@ -387,7 +391,7 @@ func (d *jobFeedDaemonStatus) dbFindInstances() error { } // dbCreateServices creates missing services -func (d *jobFeedDaemonStatus) dbCreateServices() error { +func (d *jobFeedDaemonStatus) dbCreateServices(ctx context.Context) error { objectNames, err := d.data.objectNames() if err != nil { return fmt.Errorf("dbCreateServices: %w", err) @@ -406,7 +410,7 @@ func (d *jobFeedDaemonStatus) dbCreateServices() error { for _, objectName := range missing { app := d.data.appFromObjectName(objectName, d.nodes...) slog.Debug(fmt.Sprintf("dbCreateServices: creating service %s with app %s", objectName, app)) - obj, err := d.oDb.ObjectCreate(d.ctx, objectName, d.clusterID, app, d.byNodeID[d.nodeID]) + obj, err := d.oDb.ObjectCreate(ctx, objectName, d.clusterID, app, d.byNodeID[d.nodeID]) if err != nil { return fmt.Errorf("dbCreateServices objectCreate %s: %w", objectName, err) } @@ -417,7 +421,7 @@ func (d *jobFeedDaemonStatus) dbCreateServices() error { return nil } -func (d *jobFeedDaemonStatus) dbUpdateServices() error { +func (d *jobFeedDaemonStatus) dbUpdateServices(ctx context.Context) error { for objectID, obj := range d.byObjectID { objectName := obj.Svcname _, isChanged := d.changes[objectName] @@ -425,18 +429,18 @@ func (d *jobFeedDaemonStatus) dbUpdateServices() error { // even if not present in changes if !isChanged && obj.AvailStatus != "undef" { slog.Debug(fmt.Sprintf("ping svc %s %s", objectName, objectID)) - if _, err := d.oDb.ObjectPing(d.ctx, objectID); err != nil { + if _, err := d.oDb.ObjectPing(ctx, objectID); err != nil { return fmt.Errorf("dbUpdateServices can't ping object %s %s: %w", objectName, objectID, err) } } else { oStatus := d.data.objectStatus(objectName) if oStatus != nil { slog.Debug(fmt.Sprintf("update svc log %s %s %#v", objectName, objectID, oStatus)) - if err := d.oDb.ObjectUpdateLog(d.ctx, objectID, oStatus.AvailStatus); err != nil { + if err := d.oDb.ObjectUpdateLog(ctx, objectID, oStatus.AvailStatus); err != nil { return fmt.Errorf("dbUpdateServices can't update object log %s %s: %w", objectName, objectID, err) } slog.Debug(fmt.Sprintf("update svc %s %s %#v", objectName, objectID, *oStatus)) - if err := d.oDb.ObjectUpdateStatus(d.ctx, objectID, oStatus); err != nil { + if err := d.oDb.ObjectUpdateStatus(ctx, objectID, oStatus); err != nil { return fmt.Errorf("dbUpdateServices can't update object %s %s: %w", objectName, objectID, err) } if d.byObjectID[objectID].AvailStatus != oStatus.AvailStatus { @@ -450,7 +454,7 @@ func (d *jobFeedDaemonStatus) dbUpdateServices() error { return nil } -func (d *jobFeedDaemonStatus) dbUpdateInstances() error { +func (d *jobFeedDaemonStatus) dbUpdateInstances(ctx context.Context) error { for objectName, obj := range d.byObjectName { beginObj := time.Now() objID := obj.SvcID @@ -466,7 +470,7 @@ func (d *jobFeedDaemonStatus) dbUpdateInstances() error { continue } // set iStatus svcID and nodeID for db update - err := d.dbUpdateInstance(iStatus, objID, nodeID, objectName, nodename, obj, instanceMonitorStates, node, beginInstance, d.changes) + err := d.dbUpdateInstance(ctx, iStatus, objID, nodeID, objectName, nodename, obj, instanceMonitorStates, node, beginInstance, d.changes) if err != nil { return err } @@ -476,22 +480,22 @@ func (d *jobFeedDaemonStatus) dbUpdateInstances() error { var remove bool remove = slices.Contains([]string{"up", "n/a"}, obj.AvailStatus) - if err := d.updateDashboardObject(obj, remove, &DashboardObjectUnavailable{obj: obj}); err != nil { + if err := d.updateDashboardObject(ctx, obj, remove, &DashboardObjectUnavailable{obj: obj}); err != nil { return fmt.Errorf("dbUpdateInstances on %s (%s): %w", objID, objectName, err) } remove = slices.Contains([]string{"optimal", "n/a"}, obj.Placement) - if err := d.updateDashboardObject(obj, remove, &DashboardObjectPlacement{obj: obj}); err != nil { + if err := d.updateDashboardObject(ctx, obj, remove, &DashboardObjectPlacement{obj: obj}); err != nil { return fmt.Errorf("dbUpdateInstances on %s (%s): %w", objID, objectName, err) } remove = slices.Contains([]string{"up", "n/a"}, obj.AvailStatus) && slices.Contains([]string{"up", "n/a"}, obj.OverallStatus) - if err := d.updateDashboardObject(obj, remove, &DashboardObjectDegraded{obj: obj}); err != nil { + if err := d.updateDashboardObject(ctx, obj, remove, &DashboardObjectDegraded{obj: obj}); err != nil { return fmt.Errorf("dbUpdateInstances on %s (%s): %w", objID, objectName, err) } sev := severityFromEnv(dashObjObjectFlexError, obj.Env) - if err := d.oDb.DashboardUpdateObjectFlexStarted(d.ctx, obj, sev); err != nil { + if err := d.oDb.DashboardUpdateObjectFlexStarted(ctx, obj, sev); err != nil { return fmt.Errorf("dbUpdateInstances %s (%s): %w", objID, objectName, err) } // Dropped feature: update_dash_flex_cpu @@ -503,7 +507,7 @@ func (d *jobFeedDaemonStatus) dbUpdateInstances() error { return nil } -func (d *jobFeedDaemonStatus) dbPurgeInstances() error { +func (d *jobFeedDaemonStatus) dbPurgeInstances(ctx context.Context) error { var nodeIDs, objectNames []string for objectName := range d.byObjectName { objectNames = append(objectNames, objectName) @@ -511,12 +515,12 @@ func (d *jobFeedDaemonStatus) dbPurgeInstances() error { for nodeID := range d.byNodeID { nodeIDs = append(nodeIDs, nodeID) } - instanceIDs, err := d.oDb.GetOrphanInstances(d.ctx, nodeIDs, objectNames) + instanceIDs, err := d.oDb.GetOrphanInstances(ctx, nodeIDs, objectNames) if err != nil { return fmt.Errorf("dbPurgeInstances: getOrphanInstances: %w", err) } for _, instanceID := range instanceIDs { - if err1 := d.oDb.PurgeInstance(d.ctx, instanceID); err1 != nil { + if err1 := d.oDb.PurgeInstance(ctx, instanceID); err1 != nil { err = errors.Join(err, fmt.Errorf("purge instance %v: %w", instanceID, err1)) } } @@ -526,14 +530,14 @@ func (d *jobFeedDaemonStatus) dbPurgeInstances() error { return nil } -func (d *jobFeedDaemonStatus) dbPurgeServices() error { - objectIDs, err := d.oDb.ObjectIDsFromClusterIDWithPurgeTag(d.ctx, d.clusterID) +func (d *jobFeedDaemonStatus) dbPurgeServices(ctx context.Context) error { + objectIDs, err := d.oDb.ObjectIDsFromClusterIDWithPurgeTag(ctx, d.clusterID) if err != nil { err = fmt.Errorf("dbPurgeServices objectIDsFromClusterIDWithPurgeTag: %w", err) return err } for _, objectID := range objectIDs { - if err1 := d.oDb.PurgeTablesFromObjectID(d.ctx, objectID); err1 != nil { + if err1 := d.oDb.PurgeTablesFromObjectID(ctx, objectID); err1 != nil { err = errors.Join(err, fmt.Errorf("purge object %s: %w", objectID, err1)) } } @@ -544,118 +548,10 @@ func (d *jobFeedDaemonStatus) dbPurgeServices() error { } // cacheObjectsWithoutConfig populate FeedObjectConfigForClusterIDH with names of objects without config -func (d *jobFeedDaemonStatus) cacheObjectsWithoutConfig() error { - objects, err := d.populateFeedObjectConfigForClusterIDH(d.clusterID, d.byObjectID) +func (d *jobFeedDaemonStatus) cacheObjectsWithoutConfig(ctx context.Context) error { + objects, err := d.populateFeedObjectConfigForClusterIDH(ctx, d.clusterID, d.byObjectID) if len(objects) > 0 { slog.Info(fmt.Sprintf("daemonStatus nodeID: %s need object config: %s", d.nodeID, objects)) } return err } - -func logDuration(s string, begin time.Time) { - slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin))) -} - -func logDurationInfo(s string, begin time.Time) { - slog.Info(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin))) -} - -func (d *BaseJob) dbUpdateInstance(iStatus *instanceData, objID string, nodeID string, objectName string, nodename string, obj *cdb.DBObject, instanceMonitorStates map[string]bool, node *cdb.DBNode, beginInstance time.Time, changes map[string]struct{}) error { - iStatus.SvcID = objID - iStatus.NodeID = nodeID - _, isChanged := changes[objectName+"@"+nodename] - if !isChanged && obj.AvailStatus != "undef" { - slog.Debug(fmt.Sprintf("ping instance %s@%s", objectName, nodename)) - changes, err := d.oDb.InstancePing(d.ctx, objID, nodeID) - if err != nil { - return fmt.Errorf("dbUpdateInstances can't ping instance %s@%s: %w", objectName, nodename, err) - } else if changes { - // the instance already existed, and the updated tstamp has been refreshed - // skip the inserts/updates - return nil - } - } - instanceMonitorStates[iStatus.MonSmonStatus] = true - if iStatus.encap == nil { - subNodeID, _, _, err := d.oDb.TranslateEncapNodename(d.ctx, objID, nodeID) - if err != nil { - return err - } - if subNodeID != "" && subNodeID != nodeID { - slog.Debug(fmt.Sprintf("dbUpdateInstances skip for %s@%s subNodeID:%s vs nodeID: %subNodeID", objectName, nodename, subNodeID, nodeID)) - return nil - } - if iStatus.resources == nil { - // scaler or wrapper, for example - if err := d.oDb.InstanceDeleteStatus(d.ctx, objID, nodeID); err != nil { - return fmt.Errorf("dbUpdateInstances delete status %s@%s: %w", objID, nodeID, err) - } - if err := d.oDb.InstanceResourcesDelete(d.ctx, objID, nodeID); err != nil { - return fmt.Errorf("dbUpdateInstances delete resources %s@%s: %w", objID, nodeID, err) - } - } else { - if err := d.instanceStatusUpdate(objectName, nodename, iStatus); err != nil { - return fmt.Errorf("dbUpdateInstances update status %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) - } - if err := d.instanceResourceUpdate(objectName, nodename, iStatus); err != nil { - return fmt.Errorf("dbUpdateInstances update resource %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) - } - slog.Debug(fmt.Sprintf("dbUpdateInstances deleting obsolete resources %s@%s", objectName, nodename)) - if err := d.oDb.InstanceResourcesDeleteObsolete(d.ctx, objID, nodeID, d.now); err != nil { - return fmt.Errorf("dbUpdateInstances delete obsolete resources %s@%s: %w", objID, nodeID, err) - } - } - } else { - if iStatus.resources == nil { - // scaler or wrapper, for example - if err := d.oDb.InstanceDeleteStatus(d.ctx, objID, nodeID); err != nil { - return fmt.Errorf("dbUpdateInstances delete status %s@%s: %w", objID, nodeID, err) - } - if err := d.oDb.InstanceResourcesDelete(d.ctx, objID, nodeID); err != nil { - return fmt.Errorf("dbUpdateInstances delete resources %s@%s: %w", objID, nodeID, err) - } - } else { - for _, containerStatus := range iStatus.Containers() { - slog.Debug(fmt.Sprintf("dbUpdateInstances from container status %s@%s monVmName: %s monVmType: %s", objectName, nodename, containerStatus.MonVmName, containerStatus.MonVmType)) - if containerStatus == nil { - continue - } - if containerStatus.fromOutsideStatus == "up" { - slog.Debug(fmt.Sprintf("dbUpdateInstances nodeContainerUpdateFromParentNode %s@%s encap hostname %s", - objID, nodeID, containerStatus.MonVmName)) - if err := d.oDb.NodeContainerUpdateFromParentNode(d.ctx, containerStatus.MonVmName, obj.App, node); err != nil { - return fmt.Errorf("dbUpdateInstances nodeContainerUpdateFromParentNode %s@%s encap hostname %s: %w", - objID, nodeID, containerStatus.MonVmName, err) - } - } - - if err := d.instanceStatusUpdate(objID, nodeID, containerStatus); err != nil { - return fmt.Errorf("dbUpdateInstances update container %s %s@%s (%s@%s): %w", - containerStatus.MonVmName, objID, nodeID, objectName, nodename, err) - } - if err := d.instanceResourceUpdate(objectName, nodename, iStatus); err != nil { - return fmt.Errorf("dbUpdateInstances update resource %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) - } - } - slog.Debug(fmt.Sprintf("dbUpdateInstances deleting obsolete container resources %s@%s", objectName, nodename)) - if err := d.oDb.InstanceResourcesDeleteObsolete(d.ctx, objID, nodeID, d.now); err != nil { - return fmt.Errorf("dbUpdateInstances delete obsolete container resources %s@%s: %w", objID, nodeID, err) - } - } - } - if err := d.oDb.DashboardInstanceFrozenUpdate(d.ctx, objID, nodeID, obj.Env, iStatus.MonFrozen > 0); err != nil { - return fmt.Errorf("dbUpdateInstances update dashboard instance frozen %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) - } - if err := d.oDb.DashboardDeleteInstanceNotUpdated(d.ctx, objID, nodeID); err != nil { - return fmt.Errorf("dbUpdateInstances update dashboard instance not updated %s@%s (%s@%s): %w", objectName, nodename, objID, nodeID, err) - } - // TODO: verify if we need a placement non optimal alert for object/instance - // om2 has: monitor.services.''.placement = non-optimal - // om3 has: cluster.object..placement_state = non-optimal - // cluster.node..instance..monitor.is_ha_leader - // cluster.node..instance..monitor.is_leader - // collector v2 calls update_dash_service_not_on_primary (broken since no DEFAULT.autostart_node values) - - slog.Debug(fmt.Sprintf("STAT: dbUpdateInstances instance duration %s@%s %s", objectName, nodename, time.Since(beginInstance))) - return nil -} diff --git a/worker/job_feed_instance_resource_info.go b/worker/job_feed_instance_resource_info.go index 33221ca..b810044 100644 --- a/worker/job_feed_instance_resource_info.go +++ b/worker/job_feed_instance_resource_info.go @@ -1,6 +1,7 @@ package worker import ( + "context" "encoding/json" "fmt" "log/slog" @@ -13,7 +14,9 @@ import ( type ( jobFeedInstanceResourceInfo struct { - *BaseJob + JobBase + JobRedis + JobDB // idX is the id of the posted instance config with the expected pattern: @@ idX string @@ -37,10 +40,11 @@ type ( func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFeedInstanceResourceInfo { idX := fmt.Sprintf("%s@%s@%s", objectName, nodeID, clusterID) return &jobFeedInstanceResourceInfo{ - BaseJob: &BaseJob{ + JobBase: JobBase{ name: "instanceResourceInfo", detail: "ID: " + idX, - + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedInstanceResourceInfoPendingH, cachePendingIDX: idX, }, @@ -53,17 +57,17 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe func (j *jobFeedInstanceResourceInfo) Operations() []operation { return []operation{ - {desc: "instanceResourceInfo/dropPending", do: j.dropPending}, - {desc: "instanceResourceInfo/getData", do: j.getData}, - {desc: "instanceResourceInfo/dbNow", do: j.dbNow}, - {desc: "instanceResourceInfo/updateDB", do: j.updateDB}, - {desc: "instanceResourceInfo/purgeDB", do: j.purgeDB}, - {desc: "instanceResourceInfo/pushFromTableChanges", do: j.pushFromTableChanges}, + {desc: "instanceResourceInfo/dropPending", doCtx: j.dropPending}, + {desc: "instanceResourceInfo/getData", doCtx: j.getData}, + {desc: "instanceResourceInfo/dbNow", doCtx: j.dbNow}, + {desc: "instanceResourceInfo/updateDB", doCtx: j.updateDB}, + {desc: "instanceResourceInfo/purgeDB", doCtx: j.purgeDB}, + {desc: "instanceResourceInfo/pushFromTableChanges", doCtx: j.pushFromTableChanges}, } } -func (j *jobFeedInstanceResourceInfo) getData() error { - cmd := j.redis.HGet(j.ctx, cachekeys.FeedInstanceResourceInfoH, j.idX) +func (j *jobFeedInstanceResourceInfo) getData(ctx context.Context) error { + cmd := j.redis.HGet(ctx, cachekeys.FeedInstanceResourceInfoH, j.idX) result, err := cmd.Result() switch err { case nil: @@ -78,8 +82,8 @@ func (j *jobFeedInstanceResourceInfo) getData() error { return nil } -func (j *jobFeedInstanceResourceInfo) updateDB() (err error) { - created, objectID, err := j.oDb.ObjectIDFindOrCreate(j.ctx, j.objectName, j.clusterID) +func (j *jobFeedInstanceResourceInfo) updateDB(ctx context.Context) (err error) { + created, objectID, err := j.oDb.ObjectIDFindOrCreate(ctx, j.objectName, j.clusterID) if err != nil { return fmt.Errorf("ObjectIDFindOrCreate: %w", err) } @@ -87,7 +91,7 @@ func (j *jobFeedInstanceResourceInfo) updateDB() (err error) { slog.Info(fmt.Sprintf("jobFeedInstanceResourceInfo has created new object id %s@%s %s", j.objectName, j.clusterID, objectID)) } j.objectID = objectID - err = j.oDb.InstanceResourceInfoUpdate(j.ctx, objectID, j.nodeID, j.data) + err = j.oDb.InstanceResourceInfoUpdate(ctx, objectID, j.nodeID, j.data) if err != nil { return fmt.Errorf("InstanceResourceInfoUpdate: %w", err) } @@ -95,11 +99,11 @@ func (j *jobFeedInstanceResourceInfo) updateDB() (err error) { return nil } -func (j *jobFeedInstanceResourceInfo) purgeDB() (err error) { +func (j *jobFeedInstanceResourceInfo) purgeDB(ctx context.Context) (err error) { if j.objectID == "" { return fmt.Errorf("purgeDB: objectID is empty") } - err = j.oDb.InstanceResourceInfoDelete(j.ctx, j.objectID, j.nodeID, j.now) + err = j.oDb.InstanceResourceInfoDelete(ctx, j.objectID, j.nodeID, j.now) if err != nil { return fmt.Errorf("InstanceResourceInfoDelete: %w", err) } diff --git a/worker/job_feed_instance_status.go b/worker/job_feed_instance_status.go index c36dc32..ab11e85 100644 --- a/worker/job_feed_instance_status.go +++ b/worker/job_feed_instance_status.go @@ -1,6 +1,7 @@ package worker import ( + "context" "encoding/json" "fmt" "log/slog" @@ -12,14 +13,16 @@ import ( ) type jobFeedInstanceStatus struct { - *BaseJob + JobBase + JobRedis + JobDB // idX is the id of the posted instance config with the expected pattern: @@: idX string objectName string - // objectID is db ID of the object found or created in database + // objectID is db ID of the object found or created in the database objectID string // nodeID is db ID of the node that have posted object data @@ -43,9 +46,11 @@ type jobFeedInstanceStatus struct { func newInstanceStatus(objectName, nodeID, clusterID string) *jobFeedInstanceStatus { idX := fmt.Sprintf("%s@%s@%s", objectName, nodeID, clusterID) return &jobFeedInstanceStatus{ - BaseJob: &BaseJob{ - name: "instanceStatus", - detail: "ID: " + idX, + JobBase: JobBase{ + name: "instanceStatus", + detail: "ID: " + idX, + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedInstanceStatusPendingH, cachePendingIDX: idX, }, @@ -58,22 +63,22 @@ func newInstanceStatus(objectName, nodeID, clusterID string) *jobFeedInstanceSta func (d *jobFeedInstanceStatus) Operations() []operation { return []operation{ - {desc: "instanceStatus/dropPending", do: d.dropPending}, - {desc: "instanceStatus/findNodeFromDb", do: d.findNodeFromDb}, - {desc: "instanceStatus/getData", do: d.getData}, - {desc: "instanceStatus/dbNow", do: d.dbNow}, - {desc: "instanceStatus/findObjectFromDb", do: d.findObjectFromDb}, - {desc: "instanceStatus/updateDB", do: d.updateDB}, - {desc: "instanceStatus/pushFromTableChanges", do: d.pushFromTableChanges}, - {desc: "instanceStatus/processed", do: d.processed}, + {desc: "instanceStatus/dropPending", doCtx: d.dropPending}, + {desc: "instanceStatus/findNodeFromDb", doCtx: d.findNodeFromDb}, + {desc: "instanceStatus/getData", doCtx: d.getData}, + {desc: "instanceStatus/dbNow", doCtx: d.dbNow}, + {desc: "instanceStatus/findObjectFromDb", doCtx: d.findObjectFromDb}, + {desc: "instanceStatus/updateDB", doCtx: d.updateDB}, + {desc: "instanceStatus/pushFromTableChanges", doCtx: d.pushFromTableChanges}, + {desc: "instanceStatus/processed", doCtx: d.processed}, } } -func (d *jobFeedInstanceStatus) getData() error { +func (d *jobFeedInstanceStatus) getData(ctx context.Context) error { var ( data map[string]any ) - if b, err := d.redis.HGet(d.ctx, cachekeys.FeedInstanceStatusH, d.idX).Bytes(); err != nil { + if b, err := d.redis.HGet(ctx, cachekeys.FeedInstanceStatusH, d.idX).Bytes(); err != nil { return fmt.Errorf("getData: HGET %s %s: %w", cachekeys.FeedInstanceStatusH, d.idX, err) } else if err = json.Unmarshal(b, &data); err != nil { return fmt.Errorf("getData: unexpected data from %s %s: %w", cachekeys.FeedInstanceStatusH, d.idX, err) @@ -100,8 +105,8 @@ func (d *jobFeedInstanceStatus) getData() error { return nil } -func (d *jobFeedInstanceStatus) findNodeFromDb() error { - if n, err := d.oDb.NodeByNodeID(d.ctx, d.nodeID); err != nil { +func (d *jobFeedInstanceStatus) findNodeFromDb(ctx context.Context) error { + if n, err := d.oDb.NodeByNodeID(ctx, d.nodeID); err != nil { return fmt.Errorf("findFromDb: node %s: %w", d.nodeID, err) } else { d.node = n @@ -112,8 +117,8 @@ func (d *jobFeedInstanceStatus) findNodeFromDb() error { } -func (d *jobFeedInstanceStatus) findObjectFromDb() error { - if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(d.ctx, d.objectName, d.clusterID); err != nil { +func (d *jobFeedInstanceStatus) findObjectFromDb(ctx context.Context) error { + if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID); err != nil { return fmt.Errorf("find or create object ID failed for %s: %w", d.objectName, err) } else if isNew { slog.Info(fmt.Sprintf("jobFeedInstanceStatus has created new object id %s@%s %s", d.objectName, d.clusterID, objId)) @@ -121,7 +126,7 @@ func (d *jobFeedInstanceStatus) findObjectFromDb() error { d.objectID = objId } - if obj, err := d.oDb.ObjectFromID(d.ctx, d.objectID); err != nil { + if obj, err := d.oDb.ObjectFromID(ctx, d.objectID); err != nil { return fmt.Errorf("findFromDb: object %s: %w", d.objectID, err) } else if obj == nil { return fmt.Errorf("findFromDb: object %s: not found", d.objectID) @@ -133,10 +138,11 @@ func (d *jobFeedInstanceStatus) findObjectFromDb() error { return nil } -func (d *jobFeedInstanceStatus) updateDB() error { +func (d *jobFeedInstanceStatus) updateDB(ctx context.Context) error { imonStates := make(map[string]bool) changes := make(map[string]struct{}) err := d.dbUpdateInstance( + ctx, d.status, d.objectID, d.nodeID, @@ -154,7 +160,7 @@ func (d *jobFeedInstanceStatus) updateDB() error { return nil } -func (d *jobFeedInstanceStatus) processed() error { - _ = d.redis.Publish(d.ctx, cachekeys.FeedInstanceStatusP, d.idX) +func (d *jobFeedInstanceStatus) processed(ctx context.Context) error { + _ = d.redis.Publish(ctx, cachekeys.FeedInstanceStatusP, d.idX) return nil } diff --git a/worker/job_feed_node_disk.go b/worker/job_feed_node_disk.go index b2ad3e8..1d2e27b 100644 --- a/worker/job_feed_node_disk.go +++ b/worker/job_feed_node_disk.go @@ -1,12 +1,13 @@ package worker import ( + "context" "encoding/json" "fmt" "log/slog" "strings" - redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redis/v8" "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/mariadb" @@ -14,7 +15,9 @@ import ( type ( jobFeedNodeDisk struct { - *BaseJob + JobBase + JobRedis + JobDB nodename string nodeID string @@ -25,10 +28,11 @@ type ( func newNodeDisk(nodename, nodeID, clusterID string) *jobFeedNodeDisk { return &jobFeedNodeDisk{ - BaseJob: &BaseJob{ + JobBase: JobBase{ name: "nodeDisk", detail: "nodename: " + nodename + " nodeID: " + nodeID, - + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedNodeDiskPendingH, cachePendingIDX: nodename + "@" + nodeID + "@" + clusterID, }, @@ -40,16 +44,16 @@ func newNodeDisk(nodename, nodeID, clusterID string) *jobFeedNodeDisk { func (d *jobFeedNodeDisk) Operations() []operation { return []operation{ - {desc: "nodeDisk/dropPending", do: d.dropPending}, - {desc: "nodeDisk/getData", do: d.getData}, - {desc: "nodeDisk/dbNow", do: d.dbNow}, - {desc: "nodeDisk/updateDB", do: d.updateDB}, - {desc: "nodeDisk/pushFromTableChanges", do: d.pushFromTableChanges}, + {desc: "nodeDisk/dropPending", doCtx: d.dropPending}, + {desc: "nodeDisk/getData", doCtx: d.getData}, + {desc: "nodeDisk/dbNow", doCtx: d.dbNow}, + {desc: "nodeDisk/updateDB", doCtx: d.updateDB}, + {desc: "nodeDisk/pushFromTableChanges", doCtx: d.pushFromTableChanges}, } } -func (d *jobFeedNodeDisk) getData() error { - cmd := d.redis.HGet(d.ctx, cachekeys.FeedNodeDiskH, d.cachePendingIDX) +func (d *jobFeedNodeDisk) getData(ctx context.Context) error { + cmd := d.redis.HGet(ctx, cachekeys.FeedNodeDiskH, d.cachePendingIDX) result, err := cmd.Result() switch err { case nil: @@ -116,9 +120,9 @@ func (d *jobFeedNodeDisk) getData() error { // KEY `k_svc_id` (`svc_id`), // KEY `k_svcdisks_1` (`svc_id`,`node_id`) // ) ENGINE=InnoDB AUTO_INCREMENT=4641237 DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci -func (d *jobFeedNodeDisk) updateDB() error { +func (d *jobFeedNodeDisk) updateDB(ctx context.Context) error { var ( - // pathToObjectID is a map of object path to object ID, to cache db results + // pathToObjectID is a map of an object path to object ID, to cache db results pathToObjectID = make(map[string]string) // appIDM is a map of objectID@nodeID to app ID, to cache appIDFromObjectOrNodeIDs results @@ -156,16 +160,16 @@ func (d *jobFeedNodeDisk) updateDB() error { } devID := strings.ToUpper(diskID[26:28] + ":" + diskID[28:30] + ":" + diskID[30:]) portnamePrefix := "50" + devID[2:12] + `%` - if newDiskID, err := d.oDb.DiskIDFromDiskinfoWithDevIDAndTargetID(d.ctx, devID, portnamePrefix, diskID); err != nil { + if newDiskID, err := d.oDb.DiskIDFromDiskinfoWithDevIDAndTargetID(ctx, devID, portnamePrefix, diskID); err != nil { return fmt.Errorf("search diskinfo on OPEN-V disk with diskID %s: %w", diskID, err) } else if newDiskID != "" { - if err := d.oDb.UpdateDiskinfoDiskID(d.ctx, diskID, newDiskID); err != nil { + if err := d.oDb.UpdateDiskinfoDiskID(ctx, diskID, newDiskID); err != nil { return fmt.Errorf("UpdateDiskinfoDiskID on OPEN-V disk: %w", err) } } } - diskL, err := d.oDb.DiskinfoByDiskID(d.ctx, diskID) + diskL, err := d.oDb.DiskinfoByDiskID(ctx, diskID) if err != nil { return fmt.Errorf("DiskinfoByDiskID: %w", err) } @@ -177,7 +181,7 @@ func (d *jobFeedNodeDisk) updateDB() error { // diskinfo registered as a stub for a local disk line["local"] = "T" if len(diskL) == 1 { - if changed, err := d.oDb.UpdateDiskinfoArrayID(d.ctx, diskID, nodeID); err != nil { + if changed, err := d.oDb.UpdateDiskinfoArrayID(ctx, diskID, nodeID); err != nil { return fmt.Errorf("UpdateDiskinfoArrayID: %w", err) } else if changed { d.oDb.SetChange("diskinfo") @@ -191,20 +195,20 @@ func (d *jobFeedNodeDisk) updateDB() error { if strings.HasPrefix(diskID, d.nodeID+".") && len(diskL) == 0 { line["local"] = "T" devID := strings.TrimPrefix(diskID, d.nodeID+".") - if changed, err := d.oDb.UpdateDiskinfoArrayAndDevIDsAndSize(d.ctx, diskID, nodeID, devID, int32(line["size"].(float64))); err != nil { + if changed, err := d.oDb.UpdateDiskinfoArrayAndDevIDsAndSize(ctx, diskID, nodeID, devID, int32(line["size"].(float64))); err != nil { return fmt.Errorf("updateDiskinfoArrayAndDevIDsAndSize: %w", err) } else if changed { d.oDb.SetChange("diskinfo") } } else if len(diskL) == 0 { line["local"] = "F" - if changed, err := d.oDb.UpdateDiskinfoForDiskSize(d.ctx, diskID, int32(line["size"].(float64))); err != nil { + if changed, err := d.oDb.UpdateDiskinfoForDiskSize(ctx, diskID, int32(line["size"].(float64))); err != nil { return fmt.Errorf("updateDiskinfoForDiskSize: %w", err) } else if changed { d.oDb.SetChange("diskinfo") } - if changed, err := d.oDb.UpdateDiskinfoSetMissingArrayID(d.ctx, diskID, nodeID); err != nil { + if changed, err := d.oDb.UpdateDiskinfoSetMissingArrayID(ctx, diskID, nodeID); err != nil { return fmt.Errorf("updateDiskinfoSetMissingArrayID: %w", err) } else if changed { d.oDb.SetChange("diskinfo") @@ -219,7 +223,7 @@ func (d *jobFeedNodeDisk) updateDB() error { if objectPath != "" { if objectID, ok := pathToObjectID[objectPath]; ok { line["svc_id"] = objectID - } else if created, objectID, err := d.oDb.ObjectIDFindOrCreate(d.ctx, objectPath, d.clusterID); err != nil { + } else if created, objectID, err := d.oDb.ObjectIDFindOrCreate(ctx, objectPath, d.clusterID); err != nil { return fmt.Errorf("objectIDFindOrCreate: %w", err) } else { if created { @@ -236,7 +240,7 @@ func (d *jobFeedNodeDisk) updateDB() error { // Assigns line["app_id"] with appID, or nil if appID is not detected // line["app_id"] must be defined (mapping doesn't support Optional when its data - // is []any. + // is []any). objectID := line["svc_id"].(string) if appID, ok := appIDM[objectID+"@"+nodeID]; ok { if appID != 0 { @@ -244,7 +248,7 @@ func (d *jobFeedNodeDisk) updateDB() error { } else { line["app_id"] = nil } - } else if appID, ok, err := d.oDb.AppIDFromObjectOrNodeIDs(d.ctx, nodeID, objectID); err != nil { + } else if appID, ok, err := d.oDb.AppIDFromObjectOrNodeIDs(ctx, nodeID, objectID); err != nil { return fmt.Errorf("appIDFromObjectOrNodeIDs: %w", err) } else if !ok { appIDM[objectID+"@"+nodeID] = 0 @@ -280,14 +284,14 @@ func (d *jobFeedNodeDisk) updateDB() error { Keys: []string{"disk_id", "svc_id", "node_id", "disk_dg"}, Data: data, } - if affected, err := request.ExecContextAndCountRowsAffected(d.ctx, d.db); err != nil { + if affected, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return fmt.Errorf("updateDB insert: %w", err) } else if affected > 0 { d.oDb.SetChange("svcdisks") } query := "DELETE FROM `svcdisks` WHERE `node_id` = ? AND `disk_updated` < ?" - if result, err := d.db.ExecContext(d.ctx, query, nodeID, now); err != nil { + if result, err := d.db.ExecContext(ctx, query, nodeID, now); err != nil { return fmt.Errorf("query %s: %w", query, err) } else if affected, err := result.RowsAffected(); err != nil { return fmt.Errorf("query %s count row affected: %w", query, err) @@ -297,7 +301,7 @@ func (d *jobFeedNodeDisk) updateDB() error { // TODO: validate delete query query = "DELETE FROM `diskinfo` WHERE `disk_arrayid` = ? AND `disk_updated` < ?" - if result, err := d.db.ExecContext(d.ctx, query, nodeID, now); err != nil { + if result, err := d.db.ExecContext(ctx, query, nodeID, now); err != nil { return fmt.Errorf("query %s: %w", query, err) } else if affected, err := result.RowsAffected(); err != nil { return fmt.Errorf("query %s count row affected: %w", query, err) diff --git a/worker/job_feed_object_config.go b/worker/job_feed_object_config.go index a67bd7e..59ee306 100644 --- a/worker/job_feed_object_config.go +++ b/worker/job_feed_object_config.go @@ -1,6 +1,7 @@ package worker import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -8,7 +9,7 @@ import ( "reflect" "strings" - redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redis/v8" "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/cdb" @@ -16,7 +17,9 @@ import ( type ( jobFeedObjectConfig struct { - *BaseJob + JobBase + JobRedis + JobDB // idX is the id of the posted object config with the expected pattern: @@ idX string @@ -37,10 +40,11 @@ type ( func newFeedObjectConfig(objectName, nodeID, clusterID string) *jobFeedObjectConfig { idX := fmt.Sprintf("%s@%s@%s", objectName, nodeID, clusterID) return &jobFeedObjectConfig{ - BaseJob: &BaseJob{ + JobBase: JobBase{ name: "objectConfig", detail: "ID: " + idX, - + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedObjectConfigPendingH, cachePendingIDX: idX, }, @@ -53,16 +57,16 @@ func newFeedObjectConfig(objectName, nodeID, clusterID string) *jobFeedObjectCon func (d *jobFeedObjectConfig) Operations() []operation { return []operation{ - {desc: "objectConfig/dropPending", do: d.dropPending}, - {desc: "objectConfig/getData", do: d.getData}, - {desc: "objectConfig/dbNow", do: d.dbNow}, - {desc: "objectConfig/updateDB", do: d.updateDB}, - {desc: "objectConfig/pushFromTableChanges", do: d.pushFromTableChanges}, + {desc: "objectConfig/dropPending", doCtx: d.dropPending}, + {desc: "objectConfig/getData", doCtx: d.getData}, + {desc: "objectConfig/dbNow", doCtx: d.dbNow}, + {desc: "objectConfig/updateDB", doCtx: d.updateDB}, + {desc: "objectConfig/pushFromTableChanges", doCtx: d.pushFromTableChanges}, } } -func (d *jobFeedObjectConfig) getData() error { - cmd := d.redis.HGet(d.ctx, cachekeys.FeedObjectConfigH, d.idX) +func (d *jobFeedObjectConfig) getData(ctx context.Context) error { + cmd := d.redis.HGet(ctx, cachekeys.FeedObjectConfigH, d.idX) result, err := cmd.Result() switch err { case nil: @@ -77,7 +81,7 @@ func (d *jobFeedObjectConfig) getData() error { return nil } -func (d *jobFeedObjectConfig) updateDB() (err error) { +func (d *jobFeedObjectConfig) updateDB(ctx context.Context) (err error) { // expected data //instanceConfigPost struct { // Path string `json:"path"` @@ -103,7 +107,7 @@ func (d *jobFeedObjectConfig) updateDB() (err error) { // RawConfig []byte `json:"raw_config"` //} var cfg *cdb.DBObjectConfig - if created, objectID, err := d.oDb.ObjectIDFindOrCreate(d.ctx, d.objectName, d.clusterID); err != nil { + if created, objectID, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID); err != nil { return err } else { if created { @@ -155,7 +159,7 @@ func (d *jobFeedObjectConfig) updateDB() (err error) { cfg.Env = &s } slog.Info(fmt.Sprintf("insertOrUpdateObjectConfig %s@%s@%s", cfg.Name, cfg.SvcID, cfg.ClusterID)) - if hasRowAffected, err := d.oDb.InsertOrUpdateObjectConfig(d.ctx, cfg); err != nil { + if hasRowAffected, err := d.oDb.InsertOrUpdateObjectConfig(ctx, cfg); err != nil { return err } else if hasRowAffected { d.oDb.SetChange("services") diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index 0c06413..55dc920 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -1,11 +1,12 @@ package worker import ( + "context" "encoding/json" "fmt" "log/slog" - redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redis/v8" "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/mariadb" @@ -13,7 +14,9 @@ import ( type ( jobFeedSystem struct { - *BaseJob + JobBase + JobRedis + JobDB nodeID string data map[string]any @@ -22,10 +25,11 @@ type ( func newDaemonSystem(nodeID string) *jobFeedSystem { return &jobFeedSystem{ - BaseJob: &BaseJob{ + JobBase: JobBase{ name: "daemonSystem", detail: "nodeID: " + nodeID, - + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedSystemPendingH, cachePendingIDX: nodeID, }, @@ -41,21 +45,21 @@ func (d *jobFeedSystem) Operations() []operation { } } return []operation{ - {desc: "system/dropPending", do: d.dropPending}, - {desc: "system/getData", do: d.getData}, - {desc: "system/dbNow", do: d.dbNow}, - {desc: "system/hardware", do: d.hardware, condition: hasProp("hardware"), blocking: true}, - {desc: "system/properties", do: d.properties, condition: hasProp("properties"), blocking: true}, - {desc: "system/groups", do: d.groups, condition: hasProp("gids"), blocking: true}, - {desc: "system/users", do: d.users, condition: hasProp("uids"), blocking: true}, - {desc: "system/lan", do: d.lan, condition: hasProp("lan"), blocking: true}, - {desc: "system/hba", do: d.hba, condition: hasProp("hba"), blocking: true}, - {desc: "system/targets", do: d.targets, condition: hasProp("targets"), blocking: true}, - {desc: "system/package", do: d.pkg, condition: hasProp("package"), blocking: true}, + {desc: "system/dropPending", doCtx: d.dropPending}, + {desc: "system/getData", doCtx: d.getData}, + {desc: "system/dbNow", doCtx: d.dbNow}, + {desc: "system/hardware", doCtx: d.hardware, condition: hasProp("hardware"), blocking: true}, + {desc: "system/properties", doCtx: d.properties, condition: hasProp("properties"), blocking: true}, + {desc: "system/groups", doCtx: d.groups, condition: hasProp("gids"), blocking: true}, + {desc: "system/users", doCtx: d.users, condition: hasProp("uids"), blocking: true}, + {desc: "system/lan", doCtx: d.lan, condition: hasProp("lan"), blocking: true}, + {desc: "system/hba", doCtx: d.hba, condition: hasProp("hba"), blocking: true}, + {desc: "system/targets", doCtx: d.targets, condition: hasProp("targets"), blocking: true}, + {desc: "system/package", doCtx: d.pkg, condition: hasProp("package"), blocking: true}, } } -func (d *jobFeedSystem) pkg() error { +func (d *jobFeedSystem) pkg(ctx context.Context) error { pkgList, ok := d.data["package"].([]any) if !ok { slog.Warn(fmt.Sprint("unsupported json format for packages")) @@ -91,22 +95,22 @@ func (d *jobFeedSystem) pkg() error { Data: pkgList, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM packages WHERE node_id = ? AND pkg_updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM packages WHERE node_id = ? AND pkg_updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } - if err := d.oDb.DashboardUpdatePkgDiffForNode(d.ctx, nodeID); err != nil { + if err := d.oDb.DashboardUpdatePkgDiffForNode(ctx, nodeID); err != nil { return err } return nil } -func (d *jobFeedSystem) targets() error { +func (d *jobFeedSystem) targets(ctx context.Context) error { data, ok := d.data["targets"].([]any) if !ok { slog.Warn("unsupported system targets data format") @@ -115,7 +119,7 @@ func (d *jobFeedSystem) targets() error { nodeID := d.nodeID now := d.now - for i, _ := range data { + for i := range data { line, ok := data[i].(map[string]any) if !ok { slog.Warn("unsupported system targets entry format") @@ -138,20 +142,20 @@ func (d *jobFeedSystem) targets() error { Data: data, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM stor_zone WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM stor_zone WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } return nil } -func (d *jobFeedSystem) hba() error { +func (d *jobFeedSystem) hba(ctx context.Context) error { data, ok := d.data["hba"].([]any) if !ok { slog.Warn("unsupported system hba data format") @@ -160,7 +164,7 @@ func (d *jobFeedSystem) hba() error { nodeID := d.nodeID now := d.now - for i, _ := range data { + for i := range data { line, ok := data[i].(map[string]any) if !ok { slog.Warn("unsupported system hba entry format") @@ -183,20 +187,20 @@ func (d *jobFeedSystem) hba() error { Data: data, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM node_hba WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_hba WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } return nil } -func (d *jobFeedSystem) lan() error { +func (d *jobFeedSystem) lan(ctx context.Context) error { var l []any data, ok := d.data["lan"].(map[string]any) if !ok { @@ -241,20 +245,20 @@ func (d *jobFeedSystem) lan() error { Data: l, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM node_ip WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_ip WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } return nil } -func (d *jobFeedSystem) groups() error { +func (d *jobFeedSystem) groups(ctx context.Context) error { data, ok := d.data["gids"].([]any) if !ok { slog.Warn("unsupported system groups data format") @@ -263,7 +267,7 @@ func (d *jobFeedSystem) groups() error { nodeID := d.nodeID now := d.now - for i, _ := range data { + for i := range data { line, ok := data[i].(map[string]any) if !ok { slog.Warn("unsupported system groups entry format") @@ -286,20 +290,20 @@ func (d *jobFeedSystem) groups() error { Data: data, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM node_groups WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_groups WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } return nil } -func (d *jobFeedSystem) users() error { +func (d *jobFeedSystem) users(ctx context.Context) error { data, ok := d.data["uids"].([]any) if !ok { slog.Warn("unsupported system users data format") @@ -308,7 +312,7 @@ func (d *jobFeedSystem) users() error { nodeID := d.nodeID now := d.now - for i, _ := range data { + for i := range data { line, ok := data[i].(map[string]any) if !ok { slog.Warn("unsupported system users entry format") @@ -331,20 +335,20 @@ func (d *jobFeedSystem) users() error { Data: data, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM node_users WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_users WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } return nil } -func (d *jobFeedSystem) hardware() error { +func (d *jobFeedSystem) hardware(ctx context.Context) error { data, ok := d.data["hardware"].([]any) if !ok { slog.Warn("unsupported system hardware data format") @@ -352,7 +356,7 @@ func (d *jobFeedSystem) hardware() error { } nodeID := d.nodeID now := d.now - for i, _ := range data { + for i := range data { line, ok := data[i].(map[string]any) if !ok { slog.Warn("unsupported system hardware entry format") @@ -378,20 +382,20 @@ func (d *jobFeedSystem) hardware() error { Data: data, } - if _, err := request.QueryContext(d.ctx, d.db); err != nil { + if _, err := request.QueryContext(ctx, d.db); err != nil { return err } - if rows, err := d.db.QueryContext(d.ctx, "DELETE FROM node_hw WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_hw WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err } else { - defer rows.Close() + defer func() { _ = rows.Close() }() } return nil } -func (d *jobFeedSystem) properties() error { +func (d *jobFeedSystem) properties(ctx context.Context) error { data, ok := d.data["properties"].(map[string]any) if !ok { slog.Warn("unsupported system properties format") @@ -464,13 +468,13 @@ func (d *jobFeedSystem) properties() error { Data: data, } - _, err := request.QueryContext(d.ctx, d.db) + _, err := request.QueryContext(ctx, d.db) return err } -func (d *jobFeedSystem) getData() error { - cmd := d.redis.HGet(d.ctx, cachekeys.FeedSystemH, d.nodeID) +func (d *jobFeedSystem) getData(ctx context.Context) error { + cmd := d.redis.HGet(ctx, cachekeys.FeedSystemH, d.nodeID) result, err := cmd.Result() switch err { case nil: diff --git a/worker/worker.go b/worker/worker.go index aaa8a9a..b630bbe 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -8,7 +8,7 @@ import ( "strings" "time" - redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redis/v8" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -47,7 +47,6 @@ type ( JobRunner interface { Operationer - DBGetter Name() string Detail() string } @@ -125,6 +124,7 @@ func (w *Worker) runJob(unqueuedJob []string) error { var j JobRunner slog.Debug(fmt.Sprintf("BLPOP %s -> %s", unqueuedJob[0], unqueuedJob[1])) ctx := context.Background() + withTx := w.WithTx switch unqueuedJob[0] { case cachekeys.FeedDaemonPingQ: j = newDaemonPing(unqueuedJob[1]) @@ -179,10 +179,15 @@ func (w *Worker) runJob(unqueuedJob []string) error { return nil } workType := j.Name() + + if a, ok := j.(RedisSetter); ok { + a.SetRedis(w.Redis) + } + if a, ok := j.(PrepareDBer); ok { - if err := a.PrepareDB(ctx, w.DB, w.Ev, w.WithTx); err != nil { - slog.Error(fmt.Sprintf("🔴can't get db for %s: %s", workType, err)) - return fmt.Errorf("can't get db for %s: %w", workType, err) + if err := a.PrepareDB(ctx, w.DB, w.Ev, withTx); err != nil { + slog.Error(fmt.Sprintf("🔴can't prepare db for %s: %s", workType, err)) + return fmt.Errorf("can't prepare db for %s: %w", workType, err) } } @@ -194,7 +199,7 @@ func (w *Worker) runJob(unqueuedJob []string) error { a.SetEv(w.Ev) } status := operationStatusOk - err := RunJob(j) + err := RunJob(ctx, j) duration := time.Since(begin) if err != nil { status = operationStatusFailed @@ -208,7 +213,7 @@ func (w *Worker) runJob(unqueuedJob []string) error { // jobToInstanceAndClusterID splits a jobName string into path, nodeID, and clusterID based on "@" delimiter. // Returns an error if the format is invalid or elements are empty. -// expected jobName: foo@@ +// Expected jobName: foo@@ func (w *Worker) jobToInstanceAndClusterID(jobName string) (path, nodeID, clusterID string, err error) { l := strings.Split(jobName, "@") if len(l) != 3 || l[0] == "" || l[1] == "" || l[2] == "" { From 510f93d8f03a46af9aa26941c0899f64c6860831 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Fri, 16 Jan 2026 16:51:00 +0100 Subject: [PATCH 2/3] Refactor jobFeedAction --- worker/job_feed_action.go | 60 ++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/worker/job_feed_action.go b/worker/job_feed_action.go index 083c3f4..d7e7828 100644 --- a/worker/job_feed_action.go +++ b/worker/job_feed_action.go @@ -1,19 +1,23 @@ package worker import ( + "context" "encoding/json" "fmt" "log/slog" "time" "github.com/google/uuid" + "github.com/opensvc/oc3/api" "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/cdb" ) type jobFeedAction struct { - *BaseJob + JobBase + JobRedis + JobDB nodeID string clusterID string @@ -35,9 +39,11 @@ func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedAction { idX := fmt.Sprintf("%s@%s@%s:%s", objectName, nodeID, clusterID, uuid) return &jobFeedAction{ - BaseJob: &BaseJob{ - name: "action", - detail: "ID: " + idX, + JobBase: JobBase{ + name: "action", + detail: "ID: " + idX, + }, + JobRedis: JobRedis{ cachePendingH: cachekeys.FeedActionPendingH, cachePendingIDX: idX, }, @@ -50,20 +56,20 @@ func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedAction { func (d *jobFeedAction) Operations() []operation { return []operation{ - {desc: "actionBegin/dropPending", do: d.dropPending}, - {desc: "actionBegin/getData", do: d.getData}, - {desc: "actionBegin/findNodeFromDb", do: d.findNodeFromDb}, - {desc: "actionBegin/findObjectFromDb", do: d.findObjectFromDb}, - {desc: "actionBegin/processAction", do: d.updateDB}, - {desc: "actionBegin/pushFromTableChanges", do: d.pushFromTableChanges}, + {desc: "actionBegin/dropPending", doCtx: d.dropPending}, + {desc: "actionBegin/getData", doCtx: d.getData}, + {desc: "actionBegin/findNodeFromDb", doCtx: d.findNodeFromDb}, + {desc: "actionBegin/findObjectFromDb", doCtx: d.findObjectFromDb}, + {desc: "actionBegin/processAction", doCtx: d.updateDB}, + {desc: "actionBegin/pushFromTableChanges", doCtx: d.pushFromTableChanges}, } } -func (d *jobFeedAction) getData() error { +func (d *jobFeedAction) getData(ctx context.Context) error { var ( data api.PostFeedActionJSONRequestBody ) - if b, err := d.redis.HGet(d.ctx, cachekeys.FeedActionH, d.idX).Bytes(); err != nil { + if b, err := d.redis.HGet(ctx, cachekeys.FeedActionH, d.idX).Bytes(); err != nil { return fmt.Errorf("getData: HGET %s %s: %w", cachekeys.FeedActionH, d.idX, err) } else if err = json.Unmarshal(b, &data); err != nil { return fmt.Errorf("getData: unexpected data from %s %s: %w", cachekeys.FeedActionH, d.idX, err) @@ -76,8 +82,8 @@ func (d *jobFeedAction) getData() error { return nil } -func (d *jobFeedAction) findNodeFromDb() error { - if n, err := d.oDb.NodeByNodeID(d.ctx, d.nodeID); err != nil { +func (d *jobFeedAction) findNodeFromDb(ctx context.Context) error { + if n, err := d.oDb.NodeByNodeID(ctx, d.nodeID); err != nil { return fmt.Errorf("findNodeFromDb: node %s: %w", d.nodeID, err) } else { d.node = n @@ -86,8 +92,8 @@ func (d *jobFeedAction) findNodeFromDb() error { return nil } -func (d *jobFeedAction) findObjectFromDb() error { - if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(d.ctx, d.objectName, d.clusterID); err != nil { +func (d *jobFeedAction) findObjectFromDb(ctx context.Context) error { + if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID); err != nil { return fmt.Errorf("find or create object ID failed for %s: %w", d.objectName, err) } else if isNew { slog.Info(fmt.Sprintf("jobFeedActionBegin has created new object id %s@%s %s", d.objectName, d.clusterID, objId)) @@ -99,7 +105,7 @@ func (d *jobFeedAction) findObjectFromDb() error { return nil } -func (d *jobFeedAction) updateDB() error { +func (d *jobFeedAction) updateDB(ctx context.Context) error { if d.data == nil || d.data.Path == "" { return fmt.Errorf("invalid action data: missing path") } @@ -117,11 +123,11 @@ func (d *jobFeedAction) updateDB() error { return fmt.Errorf("invalid begin time format: %w", err) } - status_log := "" + statusLog := "" if len(d.data.Argv) > 0 { - status_log = fmt.Sprintf("%s", d.data.Argv[0]) + statusLog = fmt.Sprintf("%s", d.data.Argv[0]) for i := 1; i < len(d.data.Argv); i++ { - status_log += " " + d.data.Argv[i] + statusLog += " " + d.data.Argv[i] } } @@ -132,35 +138,37 @@ func (d *jobFeedAction) updateDB() error { return fmt.Errorf("invalid end time format: %w", err) } - actionId, err := d.oDb.FindActionID(d.ctx, d.nodeID, d.objectID, beginTime, d.data.Action) + actionId, err := d.oDb.FindActionID(ctx, d.nodeID, d.objectID, beginTime, d.data.Action) if err != nil { return fmt.Errorf("find action ID failed: %w", err) } if actionId == 0 { // begin not processed yet, insert full record - if _, err := d.oDb.InsertSvcAction(d.ctx, objectUUID, nodeUUID, d.data.Action, beginTime, status_log, d.data.SessionUuid, d.data.Cron, endTime, d.data.Status); err != nil { + if _, err := d.oDb.InsertSvcAction(ctx, objectUUID, nodeUUID, d.data.Action, beginTime, statusLog, d.data.SessionUuid, d.data.Cron, endTime, d.data.Status); err != nil { return fmt.Errorf("insert svc action failed: %w", err) } } else { // begin already processed, update record with end info - if err := d.oDb.UpdateSvcAction(d.ctx, actionId, endTime, d.data.Status); err != nil { + if err := d.oDb.UpdateSvcAction(ctx, actionId, endTime, d.data.Status); err != nil { return fmt.Errorf("end svc action failed: %w", err) } } if d.data.Status == "err" { - if err := d.oDb.UpdateActionErrors(d.ctx, d.objectID, d.nodeID); err != nil { + if err := d.oDb.UpdateActionErrors(ctx, d.objectID, d.nodeID); err != nil { return fmt.Errorf("update action errors failed: %w", err) } - if err := d.oDb.UpdateDashActionErrors(d.ctx, d.objectID, d.nodeID); err != nil { + if err := d.oDb.UpdateDashActionErrors(ctx, d.objectID, d.nodeID); err != nil { return fmt.Errorf("update dash action errors failed: %w", err) } } } else { // field End is not present, process as action begin - d.oDb.InsertSvcAction(d.ctx, objectUUID, nodeUUID, d.data.Action, beginTime, status_log, d.data.SessionUuid, d.data.Cron, time.Time{}, "") + if _, err := d.oDb.InsertSvcAction(ctx, objectUUID, nodeUUID, d.data.Action, beginTime, statusLog, d.data.SessionUuid, d.data.Cron, time.Time{}, ""); err != nil { + return fmt.Errorf("insert new action failed: %w", err) + } } return nil From 31616691ddb6de5a74613225e0e4db5a7110d58b Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Fri, 16 Jan 2026 16:59:28 +0100 Subject: [PATCH 3/3] Deprecate operations without context --- worker/base_job.go | 11 ++----- worker/job_feed_action.go | 12 +++---- worker/job_feed_daemon_ping.go | 16 +++++----- worker/job_feed_daemon_status.go | 38 +++++++++++------------ worker/job_feed_instance_resource_info.go | 12 +++---- worker/job_feed_instance_status.go | 16 +++++----- worker/job_feed_node_disk.go | 10 +++--- worker/job_feed_object_config.go | 10 +++--- worker/job_feed_system.go | 22 ++++++------- 9 files changed, 71 insertions(+), 76 deletions(-) diff --git a/worker/base_job.go b/worker/base_job.go index c2e0359..a191860 100644 --- a/worker/base_job.go +++ b/worker/base_job.go @@ -18,9 +18,8 @@ type ( } operation struct { - desc string - do func() error - doCtx func(context.Context) error + desc string + do func(context.Context) error // blocking stops the operation chain on operation error blocking bool @@ -87,11 +86,7 @@ func runOps(ctx context.Context, ops ...operation) error { continue } begin := time.Now() - if op.doCtx != nil { - err = op.doCtx(ctx) - } else if op.do != nil { - err = op.do() - } + err = op.do(ctx) duration := time.Since(begin) if err != nil { operationDuration. diff --git a/worker/job_feed_action.go b/worker/job_feed_action.go index d7e7828..07690bb 100644 --- a/worker/job_feed_action.go +++ b/worker/job_feed_action.go @@ -56,12 +56,12 @@ func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedAction { func (d *jobFeedAction) Operations() []operation { return []operation{ - {desc: "actionBegin/dropPending", doCtx: d.dropPending}, - {desc: "actionBegin/getData", doCtx: d.getData}, - {desc: "actionBegin/findNodeFromDb", doCtx: d.findNodeFromDb}, - {desc: "actionBegin/findObjectFromDb", doCtx: d.findObjectFromDb}, - {desc: "actionBegin/processAction", doCtx: d.updateDB}, - {desc: "actionBegin/pushFromTableChanges", doCtx: d.pushFromTableChanges}, + {desc: "actionBegin/dropPending", do: d.dropPending}, + {desc: "actionBegin/getData", do: d.getData}, + {desc: "actionBegin/findNodeFromDb", do: d.findNodeFromDb}, + {desc: "actionBegin/findObjectFromDb", do: d.findObjectFromDb}, + {desc: "actionBegin/processAction", do: d.updateDB}, + {desc: "actionBegin/pushFromTableChanges", do: d.pushFromTableChanges}, } } diff --git a/worker/job_feed_daemon_ping.go b/worker/job_feed_daemon_ping.go index 17de815..276ed77 100644 --- a/worker/job_feed_daemon_ping.go +++ b/worker/job_feed_daemon_ping.go @@ -54,14 +54,14 @@ func newDaemonPing(nodeID string) *jobFeedDaemonPing { func (d *jobFeedDaemonPing) Operations() []operation { return []operation{ - {desc: "daemonPing/dropPending", doCtx: d.dropPending}, - {desc: "daemonPing/getData", doCtx: d.getData}, - {desc: "daemonPing/dbFetchNodes", doCtx: d.dbFetchNodes}, - {desc: "daemonPing/dbFetchObjects", doCtx: d.dbFetchObjects}, - {desc: "daemonPing/dbPingInstances", doCtx: d.dbPingInstances}, - {desc: "daemonPing/dbPingObjects", doCtx: d.dbPingObjects}, - {desc: "daemonPing/cacheObjectsWithoutConfig", doCtx: d.cacheObjectsWithoutConfig}, - {desc: "daemonPing/pushFromTableChanges", doCtx: d.pushFromTableChanges}, + {desc: "daemonPing/dropPending", do: d.dropPending}, + {desc: "daemonPing/getData", do: d.getData}, + {desc: "daemonPing/dbFetchNodes", do: d.dbFetchNodes}, + {desc: "daemonPing/dbFetchObjects", do: d.dbFetchObjects}, + {desc: "daemonPing/dbPingInstances", do: d.dbPingInstances}, + {desc: "daemonPing/dbPingObjects", do: d.dbPingObjects}, + {desc: "daemonPing/cacheObjectsWithoutConfig", do: d.cacheObjectsWithoutConfig}, + {desc: "daemonPing/pushFromTableChanges", do: d.pushFromTableChanges}, } } diff --git a/worker/job_feed_daemon_status.go b/worker/job_feed_daemon_status.go index ebd1afb..8d36a2a 100644 --- a/worker/job_feed_daemon_status.go +++ b/worker/job_feed_daemon_status.go @@ -111,25 +111,25 @@ func newDaemonStatus(nodeID string) *jobFeedDaemonStatus { func (d *jobFeedDaemonStatus) Operations() []operation { return []operation{ - {desc: "daemonStatus/dropPending", doCtx: d.dropPending}, - {desc: "daemonStatus/dbNow", doCtx: d.dbNow}, - {desc: "daemonStatus/getChanges", doCtx: d.getChanges}, - {desc: "daemonStatus/getData", doCtx: d.getData}, - {desc: "daemonStatus/dbCheckClusterIDForNodeID", doCtx: d.dbCheckClusterIDForNodeID}, - {desc: "daemonStatus/dbCheckClusters", doCtx: d.dbCheckClusters}, - {desc: "daemonStatus/dbFindNodes", doCtx: d.dbFindNodes}, - {desc: "daemonStatus/dataToNodeFrozen", doCtx: d.dataToNodeFrozen}, + {desc: "daemonStatus/dropPending", do: d.dropPending}, + {desc: "daemonStatus/dbNow", do: d.dbNow}, + {desc: "daemonStatus/getChanges", do: d.getChanges}, + {desc: "daemonStatus/getData", do: d.getData}, + {desc: "daemonStatus/dbCheckClusterIDForNodeID", do: d.dbCheckClusterIDForNodeID}, + {desc: "daemonStatus/dbCheckClusters", do: d.dbCheckClusters}, + {desc: "daemonStatus/dbFindNodes", do: d.dbFindNodes}, + {desc: "daemonStatus/dataToNodeFrozen", do: d.dataToNodeFrozen}, {desc: "daemonStatus/dataToNodeHeartbeat", do: d.dataToNodeHeartbeat}, - {desc: "daemonStatus/heartbeatToDB", doCtx: d.heartbeatToDB}, - {desc: "daemonStatus/dbFindServices", doCtx: d.dbFindServices}, - {desc: "daemonStatus/dbCreateServices", doCtx: d.dbCreateServices}, - {desc: "daemonStatus/dbFindInstances", doCtx: d.dbFindInstances}, - {desc: "daemonStatus/dbUpdateServices", doCtx: d.dbUpdateServices}, - {desc: "daemonStatus/dbUpdateInstances", doCtx: d.dbUpdateInstances}, - {desc: "daemonStatus/dbPurgeInstances", doCtx: d.dbPurgeInstances}, - {desc: "daemonStatus/dbPurgeServices", doCtx: d.dbPurgeServices}, - {desc: "daemonStatus/cacheObjectsWithoutConfig", doCtx: d.cacheObjectsWithoutConfig}, - {desc: "daemonStatus/pushFromTableChanges", doCtx: d.pushFromTableChanges}, + {desc: "daemonStatus/heartbeatToDB", do: d.heartbeatToDB}, + {desc: "daemonStatus/dbFindServices", do: d.dbFindServices}, + {desc: "daemonStatus/dbCreateServices", do: d.dbCreateServices}, + {desc: "daemonStatus/dbFindInstances", do: d.dbFindInstances}, + {desc: "daemonStatus/dbUpdateServices", do: d.dbUpdateServices}, + {desc: "daemonStatus/dbUpdateInstances", do: d.dbUpdateInstances}, + {desc: "daemonStatus/dbPurgeInstances", do: d.dbPurgeInstances}, + {desc: "daemonStatus/dbPurgeServices", do: d.dbPurgeServices}, + {desc: "daemonStatus/cacheObjectsWithoutConfig", do: d.cacheObjectsWithoutConfig}, + {desc: "daemonStatus/pushFromTableChanges", do: d.pushFromTableChanges}, } } @@ -293,7 +293,7 @@ func (d *jobFeedDaemonStatus) dataToNodeFrozen(ctx context.Context) error { return nil } -func (d *jobFeedDaemonStatus) dataToNodeHeartbeat() error { +func (d *jobFeedDaemonStatus) dataToNodeHeartbeat(_ context.Context) error { for nodeID, dbNode := range d.byNodeID { nodename := dbNode.Nodename l, err := d.data.nodeHeartbeat(nodename) diff --git a/worker/job_feed_instance_resource_info.go b/worker/job_feed_instance_resource_info.go index b810044..8d39d31 100644 --- a/worker/job_feed_instance_resource_info.go +++ b/worker/job_feed_instance_resource_info.go @@ -57,12 +57,12 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe func (j *jobFeedInstanceResourceInfo) Operations() []operation { return []operation{ - {desc: "instanceResourceInfo/dropPending", doCtx: j.dropPending}, - {desc: "instanceResourceInfo/getData", doCtx: j.getData}, - {desc: "instanceResourceInfo/dbNow", doCtx: j.dbNow}, - {desc: "instanceResourceInfo/updateDB", doCtx: j.updateDB}, - {desc: "instanceResourceInfo/purgeDB", doCtx: j.purgeDB}, - {desc: "instanceResourceInfo/pushFromTableChanges", doCtx: j.pushFromTableChanges}, + {desc: "instanceResourceInfo/dropPending", do: j.dropPending}, + {desc: "instanceResourceInfo/getData", do: j.getData}, + {desc: "instanceResourceInfo/dbNow", do: j.dbNow}, + {desc: "instanceResourceInfo/updateDB", do: j.updateDB}, + {desc: "instanceResourceInfo/purgeDB", do: j.purgeDB}, + {desc: "instanceResourceInfo/pushFromTableChanges", do: j.pushFromTableChanges}, } } diff --git a/worker/job_feed_instance_status.go b/worker/job_feed_instance_status.go index ab11e85..41003ec 100644 --- a/worker/job_feed_instance_status.go +++ b/worker/job_feed_instance_status.go @@ -63,14 +63,14 @@ func newInstanceStatus(objectName, nodeID, clusterID string) *jobFeedInstanceSta func (d *jobFeedInstanceStatus) Operations() []operation { return []operation{ - {desc: "instanceStatus/dropPending", doCtx: d.dropPending}, - {desc: "instanceStatus/findNodeFromDb", doCtx: d.findNodeFromDb}, - {desc: "instanceStatus/getData", doCtx: d.getData}, - {desc: "instanceStatus/dbNow", doCtx: d.dbNow}, - {desc: "instanceStatus/findObjectFromDb", doCtx: d.findObjectFromDb}, - {desc: "instanceStatus/updateDB", doCtx: d.updateDB}, - {desc: "instanceStatus/pushFromTableChanges", doCtx: d.pushFromTableChanges}, - {desc: "instanceStatus/processed", doCtx: d.processed}, + {desc: "instanceStatus/dropPending", do: d.dropPending}, + {desc: "instanceStatus/findNodeFromDb", do: d.findNodeFromDb}, + {desc: "instanceStatus/getData", do: d.getData}, + {desc: "instanceStatus/dbNow", do: d.dbNow}, + {desc: "instanceStatus/findObjectFromDb", do: d.findObjectFromDb}, + {desc: "instanceStatus/updateDB", do: d.updateDB}, + {desc: "instanceStatus/pushFromTableChanges", do: d.pushFromTableChanges}, + {desc: "instanceStatus/processed", do: d.processed}, } } diff --git a/worker/job_feed_node_disk.go b/worker/job_feed_node_disk.go index 1d2e27b..4b4d662 100644 --- a/worker/job_feed_node_disk.go +++ b/worker/job_feed_node_disk.go @@ -44,11 +44,11 @@ func newNodeDisk(nodename, nodeID, clusterID string) *jobFeedNodeDisk { func (d *jobFeedNodeDisk) Operations() []operation { return []operation{ - {desc: "nodeDisk/dropPending", doCtx: d.dropPending}, - {desc: "nodeDisk/getData", doCtx: d.getData}, - {desc: "nodeDisk/dbNow", doCtx: d.dbNow}, - {desc: "nodeDisk/updateDB", doCtx: d.updateDB}, - {desc: "nodeDisk/pushFromTableChanges", doCtx: d.pushFromTableChanges}, + {desc: "nodeDisk/dropPending", do: d.dropPending}, + {desc: "nodeDisk/getData", do: d.getData}, + {desc: "nodeDisk/dbNow", do: d.dbNow}, + {desc: "nodeDisk/updateDB", do: d.updateDB}, + {desc: "nodeDisk/pushFromTableChanges", do: d.pushFromTableChanges}, } } diff --git a/worker/job_feed_object_config.go b/worker/job_feed_object_config.go index 59ee306..f371697 100644 --- a/worker/job_feed_object_config.go +++ b/worker/job_feed_object_config.go @@ -57,11 +57,11 @@ func newFeedObjectConfig(objectName, nodeID, clusterID string) *jobFeedObjectCon func (d *jobFeedObjectConfig) Operations() []operation { return []operation{ - {desc: "objectConfig/dropPending", doCtx: d.dropPending}, - {desc: "objectConfig/getData", doCtx: d.getData}, - {desc: "objectConfig/dbNow", doCtx: d.dbNow}, - {desc: "objectConfig/updateDB", doCtx: d.updateDB}, - {desc: "objectConfig/pushFromTableChanges", doCtx: d.pushFromTableChanges}, + {desc: "objectConfig/dropPending", do: d.dropPending}, + {desc: "objectConfig/getData", do: d.getData}, + {desc: "objectConfig/dbNow", do: d.dbNow}, + {desc: "objectConfig/updateDB", do: d.updateDB}, + {desc: "objectConfig/pushFromTableChanges", do: d.pushFromTableChanges}, } } diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index 55dc920..df8a98d 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -45,17 +45,17 @@ func (d *jobFeedSystem) Operations() []operation { } } return []operation{ - {desc: "system/dropPending", doCtx: d.dropPending}, - {desc: "system/getData", doCtx: d.getData}, - {desc: "system/dbNow", doCtx: d.dbNow}, - {desc: "system/hardware", doCtx: d.hardware, condition: hasProp("hardware"), blocking: true}, - {desc: "system/properties", doCtx: d.properties, condition: hasProp("properties"), blocking: true}, - {desc: "system/groups", doCtx: d.groups, condition: hasProp("gids"), blocking: true}, - {desc: "system/users", doCtx: d.users, condition: hasProp("uids"), blocking: true}, - {desc: "system/lan", doCtx: d.lan, condition: hasProp("lan"), blocking: true}, - {desc: "system/hba", doCtx: d.hba, condition: hasProp("hba"), blocking: true}, - {desc: "system/targets", doCtx: d.targets, condition: hasProp("targets"), blocking: true}, - {desc: "system/package", doCtx: d.pkg, condition: hasProp("package"), blocking: true}, + {desc: "system/dropPending", do: d.dropPending}, + {desc: "system/getData", do: d.getData}, + {desc: "system/dbNow", do: d.dbNow}, + {desc: "system/hardware", do: d.hardware, condition: hasProp("hardware"), blocking: true}, + {desc: "system/properties", do: d.properties, condition: hasProp("properties"), blocking: true}, + {desc: "system/groups", do: d.groups, condition: hasProp("gids"), blocking: true}, + {desc: "system/users", do: d.users, condition: hasProp("uids"), blocking: true}, + {desc: "system/lan", do: d.lan, condition: hasProp("lan"), blocking: true}, + {desc: "system/hba", do: d.hba, condition: hasProp("hba"), blocking: true}, + {desc: "system/targets", do: d.targets, condition: hasProp("targets"), blocking: true}, + {desc: "system/package", do: d.pkg, condition: hasProp("package"), blocking: true}, } }