diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index 748399d..980723a 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -2,8 +2,11 @@ package cdb import ( "context" + "crypto/md5" "database/sql" + "encoding/json" "fmt" + "strings" "time" ) @@ -782,3 +785,214 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { } return nil } + +func (oDb *DB) DashboardUpdatePkgDiffForNode(ctx context.Context, nodeID string) error { + request := `SET @now = NOW()` + _, err := oDb.DB.ExecContext(ctx, request) + if err != nil { + return err + } + + processSvcID := func(svcID, monSvctype, monVmtype string) error { + var query string + if monVmtype != "" { + // encap peers + query = ` + SELECT DISTINCT nodes.node_id, nodes.nodename + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? + AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE) + AND svcmon.mon_vmtype != "" + ORDER BY nodes.nodename + ` + } else { + // non-encap peers + query = ` + SELECT DISTINCT nodes.node_id, nodes.nodename + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? + AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE) + AND svcmon.mon_vmtype = "" + ORDER BY nodes.nodename + ` + } + + rows, err := oDb.DB.QueryContext(ctx, query, svcID) + if err != nil { + return fmt.Errorf("failed to query nodes: %v", err) + } + defer rows.Close() + + var nodeIDs []string + var nodenames []string + for rows.Next() { + var nodeID string + var nodename string + if err := rows.Scan(&nodeID, &nodename); err != nil { + return fmt.Errorf("failed to scan node row: %v", err) + } + nodeIDs = append(nodeIDs, nodeID) + nodenames = append(nodenames, nodename) + } + + if len(nodeIDs) < 2 { + return nil + } + + // Count pkg diffs + var pkgDiffCount int + placeholders := make([]string, len(nodeIDs)) + args := make([]any, len(nodeIDs)) + for i, id := range nodeIDs { + placeholders[i] = "?" + args[i] = id + } + query = fmt.Sprintf(` + SELECT COUNT(pkg_name) + FROM ( + SELECT + pkg_name, + pkg_version, + pkg_arch, + pkg_type, + COUNT(DISTINCT node_id) AS c + FROM packages + WHERE + node_id IN (%s) + AND pkg_name NOT LIKE "gpg-pubkey%%" + GROUP BY + pkg_name, + pkg_version, + pkg_arch, + pkg_type + ) AS t + WHERE t.c != ? + `, strings.Join(placeholders, ",")) + args = append(args, len(nodeIDs)) + + err = oDb.DB.QueryRowContext(ctx, query, args...).Scan(&pkgDiffCount) + if err != nil { + return fmt.Errorf("failed to count package differences: %v", err) + } + + if pkgDiffCount == 0 { + return nil + } + + sev := 0 + if monSvctype == "PRD" { + sev = 1 + } + + // truncate too long node names list + skip := 0 + trail := "" + nodesStr := strings.Join(nodenames, ",") + for len(nodesStr)+len(trail) > 50 { + skip++ + nodenames = nodenames[:len(nodenames)-1] + nodesStr = strings.Join(nodenames, ",") + trail = fmt.Sprintf(", ... (+%d)", skip) + } + nodesStr += trail + + // Format dash_dict JSON content + dashDict := map[string]any{ + "n": pkgDiffCount, + "nodes": nodesStr, + } + dashDictJSON, err := json.Marshal(dashDict) + if err != nil { + return fmt.Errorf("failed to marshal dash_dict: %v", err) + } + + dashDictMD5 := fmt.Sprintf("%x", md5.Sum(dashDictJSON)) + + query = ` + INSERT INTO dashboard + SET + dash_type = "package differences in cluster", + svc_id = ?, + node_id = "", + dash_severity = ?, + dash_fmt = "%(n)s package differences in cluster %(nodes)s", + dash_dict = ?, + dash_dict_md5 = ?, + dash_created = @now, + dash_updated = @now, + dash_env = ? + ON DUPLICATE KEY UPDATE + dash_severity = ?, + dash_fmt = "%(n)s package differences in cluster %(nodes)s", + dash_dict = ?, + dash_dict_md5 = ?, + dash_updated = @now, + dash_env = ? + ` + + _, err = oDb.DB.ExecContext(ctx, query, + svcID, sev, dashDictJSON, dashDictMD5, monSvctype, + sev, dashDictJSON, dashDictMD5, monSvctype, + ) + if err != nil { + return fmt.Errorf("failed to insert/update dashboard: %v", err) + } + + return nil + } + + // Get the list of svc_id for instances having recently updated status + rows, err := oDb.DB.QueryContext(ctx, ` + SELECT svcmon.svc_id, svcmon.mon_svctype, svcmon.mon_vmtype + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.node_id = ? AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 19 MINUTE) + `, nodeID) + if err != nil { + return fmt.Errorf("failed to query svcmon: %v", err) + } + defer rows.Close() + + var svcIDs []any + todo := make(map[string]func() error) + for rows.Next() { + var svcID string + var monSvctype, monVmtype sql.NullString + if err := rows.Scan(&svcID, &monSvctype, &monVmtype); err != nil { + return fmt.Errorf("failed to scan svcmon row: %v", err) + } + + // Remember which svc_id needs non-updated alert clean up + svcIDs = append(svcIDs, svcID) + + // Defer after rows.Close() to avoid busy db conn errors + todo[svcID] = func() error { + return processSvcID(svcID, monSvctype.String, monVmtype.String) + } + } + rows.Close() + for svcID, fn := range todo { + if err := fn(); err != nil { + return fmt.Errorf("failed to process svc_id %s: %v", svcID, err) + } + } + + // Clean up non updated alerts + if len(svcIDs) > 0 { + query := fmt.Sprintf(` + DELETE FROM dashboard + WHERE svc_id IN (%s) + AND dash_type = "package differences in cluster" + AND dash_updated < @now + `, Placeholders(len(svcIDs))) + + _, err := oDb.DB.ExecContext(ctx, query, svcIDs...) + if err != nil { + return fmt.Errorf("failed to delete old dashboard entries: %v", err) + } + } + + return nil +} diff --git a/cdb/db_nodes.go b/cdb/db_nodes.go index 6c3f85c..df3619d 100644 --- a/cdb/db_nodes.go +++ b/cdb/db_nodes.go @@ -437,3 +437,64 @@ func (oDb *DB) UpdateVirtualAssets(ctx context.Context) error { } return nil } + +func (oDb *DB) UpdateVirtualAsset(ctx context.Context, svcID, nodeID string) error { + request := ` + UPDATE nodes n + JOIN ( + SELECT + svcmon.mon_vmname AS vmname, + services.svc_app AS svc_app, + nodes.app AS node_app, + nodes.loc_addr, + nodes.loc_city, + nodes.loc_zip, + nodes.loc_room, + nodes.loc_building, + nodes.loc_floor, + nodes.loc_rack, + nodes.power_cabinet1, + nodes.power_cabinet2, + nodes.power_supply_nb, + nodes.power_protect, + nodes.power_protect_breaker, + nodes.power_breaker1, + nodes.power_breaker2, + nodes.loc_country, + nodes.enclosure + FROM svcmon + JOIN services ON svcmon.svc_id = services.svc_id + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? AND svcmon.node_id = ? + ) AS source + SET + n.loc_addr = COALESCE(NULLIF(source.loc_addr, ''), n.loc_addr), + n.loc_city = COALESCE(NULLIF(source.loc_city, ''), n.loc_city), + n.loc_zip = COALESCE(NULLIF(source.loc_zip, ''), n.loc_zip), + n.loc_room = COALESCE(NULLIF(source.loc_room, ''), n.loc_room), + n.loc_building = COALESCE(NULLIF(source.loc_building, ''), n.loc_building), + n.loc_floor = COALESCE(NULLIF(source.loc_floor, ''), n.loc_floor), + n.loc_rack = COALESCE(NULLIF(source.loc_rack, ''), n.loc_rack), + n.power_cabinet1 = COALESCE(NULLIF(source.power_cabinet1, ''), n.power_cabinet1), + n.power_cabinet2 = COALESCE(NULLIF(source.power_cabinet2, ''), n.power_cabinet2), + n.power_supply_nb = COALESCE(NULLIF(source.power_supply_nb, ''), n.power_supply_nb), + n.power_protect = COALESCE(NULLIF(source.power_protect, ''), n.power_protect), + n.power_protect_breaker = COALESCE(NULLIF(source.power_protect_breaker, ''), n.power_protect_breaker), + n.power_breaker1 = COALESCE(NULLIF(source.power_breaker1, ''), n.power_breaker1), + n.power_breaker2 = COALESCE(NULLIF(source.power_breaker2, ''), n.power_breaker2), + n.loc_country = COALESCE(NULLIF(source.loc_country, ''), n.loc_country), + n.enclosure = COALESCE(NULLIF(source.enclosure, ''), n.enclosure) + WHERE + n.nodename = source.vmname AND + n.app IN (source.svc_app, source.node_app)` + result, err := oDb.DB.ExecContext(ctx, request, svcID, nodeID) + if err != nil { + return err + } + if rowAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowAffected > 0 { + oDb.SetChange("nodes") + } + return nil +} diff --git a/scheduler/main.go b/scheduler/main.go index 7081f81..ed88667 100644 --- a/scheduler/main.go +++ b/scheduler/main.go @@ -44,6 +44,10 @@ func (t *Scheduler) Debugf(format string, args ...any) { func (t *Scheduler) toggleTasks(ctx context.Context, states map[string]State) { for _, task := range Tasks { + if task.period == 0 { + //task.Debugf("skip: no period") + continue + } name := task.Name() storedState, _ := states[name] cachedState, hasCachedState := t.states[name] diff --git a/scheduler/task_alerts.go b/scheduler/task_alerts.go index 56daf20..b6ddf06 100644 --- a/scheduler/task_alerts.go +++ b/scheduler/task_alerts.go @@ -16,7 +16,7 @@ var TaskAlert1M = Task{ TaskAlertInstancesNotUpdated, }, period: time.Minute, - timeout: 15 * time.Minute, + timeout: 15 * time.Second, } var TaskAlert1H = Task{ diff --git a/scheduler/task_metrics.go b/scheduler/task_metrics.go index 9c2f09f..d608fca 100644 --- a/scheduler/task_metrics.go +++ b/scheduler/task_metrics.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "os" "path/filepath" "strings" "time" @@ -25,21 +24,11 @@ var TaskMetrics = Task{ } func MakeWSPFilename(format string, args ...any) (string, error) { - var dir string - candidates := viper.GetStringSlice("scheduler.task.metrics.directories") - if len(candidates) == 0 { - return "", fmt.Errorf("scheduler.task.metrics.directories is not set") + directory := viper.GetString("scheduler.directories.uploads") + if directory == "" { + return "", fmt.Errorf("define scheduler.directories.uploads") } - for _, d := range candidates { - if _, err := os.Stat(d); err == nil { - dir = d - break - } - } - if dir == "" { - return "", fmt.Errorf("scheduler.task.metrics.directories has no existing entry") - } - return filepath.Join(dir, fmt.Sprintf(format+".wsp", args...)), nil + return filepath.Join(directory, "stats", fmt.Sprintf(format+".wsp", args...)), nil } func taskMetrics(ctx context.Context, task *Task) error { diff --git a/scheduler/task_nodes.go b/scheduler/task_nodes.go index 28caa2c..c2c88f4 100644 --- a/scheduler/task_nodes.go +++ b/scheduler/task_nodes.go @@ -8,7 +8,7 @@ import ( var TaskUpdateVirtualAssets = Task{ name: "update_virtual_assets", fn: taskUpdateVirtualAssets, - timeout: time.Minute, + timeout: 10 * time.Second, } func taskUpdateVirtualAssets(ctx context.Context, task *Task) error { diff --git a/scheduler/task_scrub.go b/scheduler/task_scrub.go index da039d7..ad5a707 100644 --- a/scheduler/task_scrub.go +++ b/scheduler/task_scrub.go @@ -93,6 +93,12 @@ var TaskScrubSvcdisks = Task{ timeout: time.Minute, } +var TaskScrubStatic = Task{ + name: "scrub_static", + fn: taskScrubStatic, + timeout: time.Minute, +} + var TaskScrubTempviz = Task{ name: "scrub_tempviz", fn: taskScrubTempviz, @@ -157,6 +163,7 @@ var TaskScrub1D = Task{ TaskScrubPatches, TaskScrubPdf, TaskScrubResmon, + TaskScrubStatic, TaskScrubStorArray, TaskScrubSvcdisks, TaskUpdateStorArrayDGQuota, @@ -578,21 +585,12 @@ func taskUpdateStorArrayDGQuota(ctx context.Context, task *Task) error { return odb.Commit() } -func taskScrubTempviz(ctx context.Context, task *Task) error { - threshold := time.Now().Add(-1 * time.Hour) - directories := viper.GetStringSlice("scheduler.task.scrub_tempviz.directories") - if len(directories) == 0 { - slog.Warn("skip: define scheduler.task.scrub_tempviz.directories") - return nil - } +func scrubFiles(pattern string, threshold time.Time) error { var matches []string - for _, directory := range directories { - pattern := filepath.Join(directory, "tempviz*") - if m, err := filepath.Glob(pattern); err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } else { - matches = append(matches, m...) - } + if m, err := filepath.Glob(pattern); err != nil { + return fmt.Errorf("failed to glob files: %w", err) + } else { + matches = append(matches, m...) } for _, fpath := range matches { fileInfo, err := os.Stat(fpath) @@ -610,36 +608,53 @@ func taskScrubTempviz(ctx context.Context, task *Task) error { return nil } -func taskScrubPdf(ctx context.Context, task *Task) error { - threshold := time.Now().Add(-24 * time.Hour) - directories := viper.GetStringSlice("scheduler.task.scrub_pdf.directories") - if len(directories) == 0 { - slog.Warn("skip: define scheduler.task.scrub_pdf.directories") +func taskScrubStatic(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-1 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") return nil } - var matches []string - for _, directory := range directories { - pattern := filepath.Join(directory, "*-*-*-*-*.pdf") - if m, err := filepath.Glob(pattern); err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } else { - matches = append(matches, m...) - } + if err := scrubFiles(filepath.Join(directory, "tempviz*.png"), threshold); err != nil { + return err } - for _, fpath := range matches { - fileInfo, err := os.Stat(fpath) - if err != nil { - return err - } - mtime := fileInfo.ModTime() - if mtime.Before(threshold) { - slog.Info(fmt.Sprintf("rm %s mtime %s", fpath, mtime)) - if err := os.Remove(fpath); err != nil { - return fmt.Errorf("failed to rm %s: %w", fpath, err) - } - } + if err := scrubFiles(filepath.Join(directory, "tempviz*.dot"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stats_*_[0-9]*.png"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stat_*_[0-9]*.png"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stats_*_[0-9]*.svg"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "*-*-*-*.pdf"), threshold); err != nil { + return err } return nil + +} + +func taskScrubTempviz(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-1 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") + return nil + } + return scrubFiles(filepath.Join(directory, "tempviz*"), threshold) +} + +func taskScrubPdf(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-24 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") + return nil + } + return scrubFiles(filepath.Join(directory, "*-*-*-*-*.pdf"), threshold) } func taskScrubUnfinishedActions(ctx context.Context, task *Task) error { diff --git a/worker/job.go b/worker/job.go index 39c9054..8b1ff74 100644 --- a/worker/job.go +++ b/worker/job.go @@ -161,9 +161,10 @@ func runOps(ops ...operation) error { With(prometheus.Labels{"desc": op.desc, "status": operationStatusFailed}). Observe(duration.Seconds()) if op.blocking { - continue + return err } - return err + slog.Warn("%s: non blocking error: %s", op.desc, err) + continue } operationDuration. With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}). diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index 94f95fb..0c06413 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -100,7 +100,9 @@ func (d *jobFeedSystem) pkg() error { } else { defer rows.Close() } - + if err := d.oDb.DashboardUpdatePkgDiffForNode(d.ctx, nodeID); err != nil { + return err + } return nil }