From 069df9544f07cb7a227a41c5bf29e30256efab2d Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Wed, 14 Jan 2026 13:22:07 +0100 Subject: [PATCH 1/9] Add the "alert_package_differences_in_cluster" scheduler task --- cdb/db_dashboard.go | 148 +++++++++++++++++++++++++++++++++++++++ scheduler/task_alerts.go | 22 ++++++ 2 files changed, 170 insertions(+) diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index 748399d..d8d8f20 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -782,3 +782,151 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { } return nil } + +func (oDb *DB) DashboardUpdatePkgDiff(ctx context.Context) error { + request := `SET @now = NOW()` + result, err := oDb.DB.ExecContext(ctx, request) + if err != nil { + return err + } + + request = ` + INSERT INTO dashboard ( + dash_type, + svc_id, + node_id, + dash_severity, + dash_fmt, + dash_dict, + dash_dict_md5, + dash_created, + dash_updated, + dash_env + ) + WITH + pkg_cluster_counts AS ( + SELECT + nodes.cluster_id, + nodes.node_id, + packages.pkg_name, + packages.pkg_version, + packages.pkg_arch, + packages.pkg_type, + COUNT(DISTINCT nodes.node_id) AS node_count + FROM packages + JOIN nodes USING (node_id) + WHERE packages.pkg_name NOT LIKE 'gpg-pubkey%' + GROUP BY + nodes.cluster_id, + packages.pkg_name, + packages.pkg_version, + packages.pkg_arch, + packages.pkg_type + ), + + cluster_nodes_counts AS ( + SELECT + cluster_id, + COUNT(node_id) AS node_count, + GROUP_CONCAT(nodename SEPARATOR ", ") AS nodenames + FROM nodes + GROUP BY cluster_id + ), + + pkg_cluster_diffs AS ( + SELECT + pkg_cluster_counts.*, + cluster_nodes_counts.nodenames, + cluster_nodes_counts.node_count AS cluster_node_count + FROM pkg_cluster_counts + JOIN cluster_nodes_counts + USING (cluster_id) + WHERE + pkg_cluster_counts.node_count < cluster_nodes_counts.node_count + ), + + pkg_cluster_diffcount AS ( + SELECT + nodes.cluster_id, + nodes.nodename, + nodes.node_id, + pkg_cluster_diffs.nodenames, + COUNT(*) AS pkg_diffs, + GROUP_CONCAT(DISTINCT pkg_cluster_diffs.pkg_name SEPARATOR ", ") AS pkg_names + FROM pkg_cluster_diffs + JOIN nodes USING (node_id) + GROUP BY + nodes.cluster_id + ), + + alerts AS ( + SELECT + services.svcname, + svcmon.svc_id, + svcmon.mon_svctype, + pkg_cluster_diffcount.* + FROM svcmon + JOIN pkg_cluster_diffcount USING (node_id) + JOIN services USING (svc_id) + ) + + SELECT + 'package differences in cluster' AS dash_type, + a.svc_id, + NULL AS node_id, + IF(a.mon_svctype = "PRD", 1, 0) AS dash_severity, + "%(n)d package differences in cluster %(nodes)s" AS dash_fmt, + JSON_OBJECT( + 'n', a.pkg_diffs, + 'nodes', a.nodenames, + 'cluster_id', a.cluster_id + ) AS dash_dict, + MD5(CONCAT( + a.pkg_diffs, + a.nodenames, + a.cluster_id + )) AS dash_dict_md5, + @now AS dash_created, + @now AS dash_updated, + a.mon_svctype AS dash_env + FROM alerts a + + ON DUPLICATE KEY UPDATE + dash_severity = VALUES(dash_severity), + dash_fmt = VALUES(dash_fmt), + dash_dict = VALUES(dash_dict), + dash_dict_md5 = VALUES(dash_dict_md5), + dash_updated = VALUES(dash_updated), + dash_env = VALUES(dash_env) + ` + result, err = oDb.DB.ExecContext(ctx, request) + if err != nil { + return err + } + if rowAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowAffected > 0 { + oDb.SetChange("dashboard") + } + + request = ` + DELETE FROM dashboard + WHERE + dash_type="package differences in cluster" AND + ( + dash_updated < @now or + dash_updated IS NULL + ) + ` + result, err = oDb.DB.ExecContext(ctx, request) + if err != nil { + return err + } + if rowAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowAffected > 0 { + oDb.SetChange("dashboard") + } + + return nil +} diff --git a/scheduler/task_alerts.go b/scheduler/task_alerts.go index 56daf20..b0ab056 100644 --- a/scheduler/task_alerts.go +++ b/scheduler/task_alerts.go @@ -45,6 +45,7 @@ var TaskAlert1D = Task{ TaskAlertNodeCloseToMaintenanceEnd, TaskAlertNodeMaintenanceExpired, TaskAlertNodeWithoutMaintenanceEnd, + TaskAlertPackageDifferencesInCluster, TaskAlertPurgeActionErrors, TaskPurgeAlertsOnDeletedInstances, }, @@ -130,6 +131,12 @@ var TaskLogInstancesNotUpdated = Task{ timeout: 5 * time.Minute, } +var TaskAlertPackageDifferencesInCluster = Task{ + name: "alert_package_differences_in_cluster", + fn: taskAlertPackageDifferencesInCluster, + timeout: 5 * time.Minute, +} + var TaskAlertPurgeActionErrors = Task{ name: "alert_purge_action_errors", fn: taskAlertPurgeActionErrors, @@ -265,6 +272,21 @@ func taskLogInstancesNotUpdated(ctx context.Context, task *Task) error { return odb.Commit() } +func taskAlertPackageDifferencesInCluster(ctx context.Context, task *Task) error { + odb, err := task.DBX(ctx) + if err != nil { + return err + } + defer odb.Rollback() + if err := odb.DashboardUpdatePkgDiff(ctx); err != nil { + return err + } + if err := odb.Session.NotifyChanges(ctx); err != nil { + return err + } + return odb.Commit() +} + func taskAlertPurgeActionErrors(ctx context.Context, task *Task) error { odb, err := task.DBX(ctx) if err != nil { From 1aa1a6aa7336de4a0f63f54ea0d3d01343f33383 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Thu, 15 Jan 2026 16:33:01 +0100 Subject: [PATCH 2/9] Fix the logging of errors emitted by the worker tasks --- worker/job.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/worker/job.go b/worker/job.go index 39c9054..8b1ff74 100644 --- a/worker/job.go +++ b/worker/job.go @@ -161,9 +161,10 @@ func runOps(ops ...operation) error { With(prometheus.Labels{"desc": op.desc, "status": operationStatusFailed}). Observe(duration.Seconds()) if op.blocking { - continue + return err } - return err + slog.Warn("%s: non blocking error: %s", op.desc, err) + continue } operationDuration. With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}). From 302fc88dc12e3a59616364843c41364c9537ed88 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Thu, 15 Jan 2026 16:33:51 +0100 Subject: [PATCH 3/9] Fix the timeout of some scheduler tasks --- scheduler/task_alerts.go | 2 +- scheduler/task_nodes.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/task_alerts.go b/scheduler/task_alerts.go index b0ab056..b162525 100644 --- a/scheduler/task_alerts.go +++ b/scheduler/task_alerts.go @@ -16,7 +16,7 @@ var TaskAlert1M = Task{ TaskAlertInstancesNotUpdated, }, period: time.Minute, - timeout: 15 * time.Minute, + timeout: 15 * time.Second, } var TaskAlert1H = Task{ diff --git a/scheduler/task_nodes.go b/scheduler/task_nodes.go index 28caa2c..c2c88f4 100644 --- a/scheduler/task_nodes.go +++ b/scheduler/task_nodes.go @@ -8,7 +8,7 @@ import ( var TaskUpdateVirtualAssets = Task{ name: "update_virtual_assets", fn: taskUpdateVirtualAssets, - timeout: time.Minute, + timeout: 10 * time.Second, } func taskUpdateVirtualAssets(ctx context.Context, task *Task) error { From c59a0bf3a14fa70750b9bf3807e5e2f3d9001445 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Thu, 15 Jan 2026 16:34:33 +0100 Subject: [PATCH 4/9] Don't schedule tasks with no period --- scheduler/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scheduler/main.go b/scheduler/main.go index 7081f81..ed88667 100644 --- a/scheduler/main.go +++ b/scheduler/main.go @@ -44,6 +44,10 @@ func (t *Scheduler) Debugf(format string, args ...any) { func (t *Scheduler) toggleTasks(ctx context.Context, states map[string]State) { for _, task := range Tasks { + if task.period == 0 { + //task.Debugf("skip: no period") + continue + } name := task.Name() storedState, _ := states[name] cachedState, hasCachedState := t.states[name] From 6bbdfc5000e8b13bbc60d0722b7f1b9c8be59cab Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Thu, 15 Jan 2026 16:35:14 +0100 Subject: [PATCH 5/9] Add the UpdateVirtualAsset(ctx, svcID, nodeID) db helper --- cdb/db_nodes.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/cdb/db_nodes.go b/cdb/db_nodes.go index 6c3f85c..df3619d 100644 --- a/cdb/db_nodes.go +++ b/cdb/db_nodes.go @@ -437,3 +437,64 @@ func (oDb *DB) UpdateVirtualAssets(ctx context.Context) error { } return nil } + +func (oDb *DB) UpdateVirtualAsset(ctx context.Context, svcID, nodeID string) error { + request := ` + UPDATE nodes n + JOIN ( + SELECT + svcmon.mon_vmname AS vmname, + services.svc_app AS svc_app, + nodes.app AS node_app, + nodes.loc_addr, + nodes.loc_city, + nodes.loc_zip, + nodes.loc_room, + nodes.loc_building, + nodes.loc_floor, + nodes.loc_rack, + nodes.power_cabinet1, + nodes.power_cabinet2, + nodes.power_supply_nb, + nodes.power_protect, + nodes.power_protect_breaker, + nodes.power_breaker1, + nodes.power_breaker2, + nodes.loc_country, + nodes.enclosure + FROM svcmon + JOIN services ON svcmon.svc_id = services.svc_id + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? AND svcmon.node_id = ? + ) AS source + SET + n.loc_addr = COALESCE(NULLIF(source.loc_addr, ''), n.loc_addr), + n.loc_city = COALESCE(NULLIF(source.loc_city, ''), n.loc_city), + n.loc_zip = COALESCE(NULLIF(source.loc_zip, ''), n.loc_zip), + n.loc_room = COALESCE(NULLIF(source.loc_room, ''), n.loc_room), + n.loc_building = COALESCE(NULLIF(source.loc_building, ''), n.loc_building), + n.loc_floor = COALESCE(NULLIF(source.loc_floor, ''), n.loc_floor), + n.loc_rack = COALESCE(NULLIF(source.loc_rack, ''), n.loc_rack), + n.power_cabinet1 = COALESCE(NULLIF(source.power_cabinet1, ''), n.power_cabinet1), + n.power_cabinet2 = COALESCE(NULLIF(source.power_cabinet2, ''), n.power_cabinet2), + n.power_supply_nb = COALESCE(NULLIF(source.power_supply_nb, ''), n.power_supply_nb), + n.power_protect = COALESCE(NULLIF(source.power_protect, ''), n.power_protect), + n.power_protect_breaker = COALESCE(NULLIF(source.power_protect_breaker, ''), n.power_protect_breaker), + n.power_breaker1 = COALESCE(NULLIF(source.power_breaker1, ''), n.power_breaker1), + n.power_breaker2 = COALESCE(NULLIF(source.power_breaker2, ''), n.power_breaker2), + n.loc_country = COALESCE(NULLIF(source.loc_country, ''), n.loc_country), + n.enclosure = COALESCE(NULLIF(source.enclosure, ''), n.enclosure) + WHERE + n.nodename = source.vmname AND + n.app IN (source.svc_app, source.node_app)` + result, err := oDb.DB.ExecContext(ctx, request, svcID, nodeID) + if err != nil { + return err + } + if rowAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowAffected > 0 { + oDb.SetChange("nodes") + } + return nil +} From c80cf33700a4c347b34a97e3a7da50ad1afdfc4d Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Thu, 15 Jan 2026 16:35:59 +0100 Subject: [PATCH 6/9] Add the DashboardUpdatePkgDiffForNode(ctx, nodeID) db helper Call it at the end of the pushpkg feed job. --- cdb/db_dashboard.go | 172 ++++++++++++++++++++++++++++++++++++++ worker/job_feed_system.go | 4 +- 2 files changed, 175 insertions(+), 1 deletion(-) diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index d8d8f20..e416524 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -930,3 +930,175 @@ func (oDb *DB) DashboardUpdatePkgDiff(ctx context.Context) error { return nil } + +// DashboardUpdatePkgDiffForNode refreshes only "package differences in cluster" alerts +// for the node's cluster. +func (oDb *DB) DashboardUpdatePkgDiffForNode(ctx context.Context, nodeID string) error { + request := `SET @now = NOW()` + _, err := oDb.DB.ExecContext(ctx, request) + if err != nil { + return fmt.Errorf("failed to set @now: %v", err) + } + request = ` + INSERT INTO dashboard ( + dash_type, + svc_id, + node_id, + dash_severity, + dash_fmt, + dash_dict, + dash_dict_md5, + dash_created, + dash_updated, + dash_env + ) + + WITH + svc_nodes AS ( + SELECT + svcmon.svc_id, + nodes.node_id, + nodes.nodename, + nodes.cluster_id + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE + svcmon.mon_updated > DATE_SUB(@now, INTERVAL 20 MINUTE) AND + svcmon.node_id = ? + ), + + pkg_svc_counts AS ( + SELECT + svc_nodes.svc_id, + svc_nodes.node_id, + packages.pkg_name, + packages.pkg_version, + packages.pkg_arch, + packages.pkg_type, + COUNT(DISTINCT svc_nodes.node_id) AS node_count + FROM packages + JOIN svc_nodes ON packages.node_id = svc_nodes.node_id + WHERE packages.pkg_name NOT LIKE 'gpg-pubkey%' + GROUP BY + svc_nodes.svc_id, + packages.pkg_name, + packages.pkg_version, + packages.pkg_arch, + packages.pkg_type + ), + + svc_node_counts AS ( + SELECT + svc_id, + COUNT(node_id) AS node_count, + GROUP_CONCAT(nodename SEPARATOR ", ") AS nodenames + FROM svc_nodes + GROUP BY svc_id + ), + + pkg_svc_diffs AS ( + SELECT + pkg_svc_counts.*, + svc_node_counts.nodenames, + svc_node_counts.node_count AS svc_node_count + FROM pkg_svc_counts + JOIN svc_node_counts ON pkg_svc_counts.svc_id = svc_node_counts.svc_id + WHERE + pkg_svc_counts.node_count < svc_node_counts.node_count + ), + + pkg_svc_diffcount AS ( + SELECT + svc_nodes.svc_id, + svc_nodes.nodename, + svc_nodes.node_id, + svc_nodes.cluster_id, + pkg_svc_diffs.nodenames, + COUNT(*) AS pkg_diffs, + GROUP_CONCAT(DISTINCT pkg_svc_diffs.pkg_name SEPARATOR ", ") AS pkg_names + FROM pkg_svc_diffs + JOIN svc_nodes ON pkg_svc_diffs.node_id = svc_nodes.node_id + GROUP BY + svc_nodes.svc_id, + svc_nodes.nodename, + svc_nodes.node_id, + svc_nodes.cluster_id, + pkg_svc_diffs.nodenames + ), + + alerts AS ( + SELECT + services.svcname, + svcmon.mon_svctype, + pkg_svc_diffcount.* + FROM pkg_svc_diffcount + JOIN svcmon ON pkg_svc_diffcount.node_id = svcmon.node_id AND pkg_svc_diffcount.svc_id = svcmon.svc_id + JOIN services ON pkg_svc_diffcount.svc_id = services.svc_id + ) + + SELECT + 'package differences in service' AS dash_type, + a.svc_id, + NULL AS node_id, + IF(a.mon_svctype = 'PRD', 1, 0) AS dash_severity, + CONCAT(a.pkg_diffs, ' package differences in service ', a.nodenames) AS dash_fmt, + JSON_OBJECT( + 'n', a.pkg_diffs, + 'nodes', a.nodenames, + 'svc_id', a.svc_id, + 'cluster_id', a.cluster_id + ) AS dash_dict, + MD5(CONCAT( + a.pkg_diffs, + a.nodenames, + a.svc_id, + a.cluster_id + )) AS dash_dict_md5, + @now AS dash_created, + @now AS dash_updated, + a.mon_svctype AS dash_env + FROM alerts a + ON DUPLICATE KEY UPDATE + dash_severity = VALUES(dash_severity), + dash_fmt = VALUES(dash_fmt), + dash_dict = VALUES(dash_dict), + dash_dict_md5 = VALUES(dash_dict_md5), + dash_updated = VALUES(dash_updated), + dash_env = VALUES(dash_env); + ` + + result, err := oDb.DB.ExecContext(ctx, request, nodeID) + if err != nil { + return fmt.Errorf("failed to update dashboard: %v", err) + } + + rowAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get affected rows: %v", err) + } else if rowAffected > 0 { + oDb.SetChange("dashboard") + } + + request = ` + DELETE d FROM dashboard d + LEFT JOIN svcmon m ON d.svc_id=m.svc_id + WHERE + m.node_id = ? AND + dash_type = "package differences in cluster" AND + dash_updated < @now + ` + + result, err = oDb.DB.ExecContext(ctx, request, nodeID) + if err != nil { + return fmt.Errorf("failed to delete old dashboard entries: %v", err) + } + + rowAffected, err = result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get affected rows: %v", err) + } else if rowAffected > 0 { + oDb.SetChange("dashboard") + } + + return nil +} diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index 94f95fb..0c06413 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -100,7 +100,9 @@ func (d *jobFeedSystem) pkg() error { } else { defer rows.Close() } - + if err := d.oDb.DashboardUpdatePkgDiffForNode(d.ctx, nodeID); err != nil { + return err + } return nil } From 1171bf800344f2b318ae4fbaeec91e95f6c033d3 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Fri, 16 Jan 2026 10:03:51 +0100 Subject: [PATCH 7/9] Reimplement the DashboardUpdatePkgDiffForNode() db helper For speed by using sql simpler exec plans, at the cost of more requests. --- cdb/db_dashboard.go | 334 +++++++++++++++++++++++++------------------- 1 file changed, 188 insertions(+), 146 deletions(-) diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index e416524..c114845 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -2,8 +2,11 @@ package cdb import ( "context" + "crypto/md5" "database/sql" + "encoding/json" "fmt" + "strings" "time" ) @@ -931,173 +934,212 @@ func (oDb *DB) DashboardUpdatePkgDiff(ctx context.Context) error { return nil } -// DashboardUpdatePkgDiffForNode refreshes only "package differences in cluster" alerts -// for the node's cluster. func (oDb *DB) DashboardUpdatePkgDiffForNode(ctx context.Context, nodeID string) error { request := `SET @now = NOW()` _, err := oDb.DB.ExecContext(ctx, request) if err != nil { - return fmt.Errorf("failed to set @now: %v", err) + return err } - request = ` - INSERT INTO dashboard ( - dash_type, - svc_id, - node_id, - dash_severity, - dash_fmt, - dash_dict, - dash_dict_md5, - dash_created, - dash_updated, - dash_env - ) - WITH - svc_nodes AS ( - SELECT - svcmon.svc_id, - nodes.node_id, - nodes.nodename, - nodes.cluster_id - FROM svcmon - JOIN nodes ON svcmon.node_id = nodes.node_id - WHERE - svcmon.mon_updated > DATE_SUB(@now, INTERVAL 20 MINUTE) AND - svcmon.node_id = ? - ), + processSvcID := func(svcID, monSvctype, monVmtype string) error { + var query string + if monVmtype != "" { + // encap peers + query = ` + SELECT DISTINCT nodes.node_id, nodes.nodename + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? + AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE) + AND svcmon.mon_vmtype != "" + ORDER BY nodes.nodename + ` + } else { + // non-encap peers + query = ` + SELECT DISTINCT nodes.node_id, nodes.nodename + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? + AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE) + AND svcmon.mon_vmtype = "" + ORDER BY nodes.nodename + ` + } - pkg_svc_counts AS ( - SELECT - svc_nodes.svc_id, - svc_nodes.node_id, - packages.pkg_name, - packages.pkg_version, - packages.pkg_arch, - packages.pkg_type, - COUNT(DISTINCT svc_nodes.node_id) AS node_count - FROM packages - JOIN svc_nodes ON packages.node_id = svc_nodes.node_id - WHERE packages.pkg_name NOT LIKE 'gpg-pubkey%' - GROUP BY - svc_nodes.svc_id, - packages.pkg_name, - packages.pkg_version, - packages.pkg_arch, - packages.pkg_type - ), + rows, err := oDb.DB.QueryContext(ctx, query, svcID) + if err != nil { + return fmt.Errorf("failed to query nodes: %v", err) + } + defer rows.Close() - svc_node_counts AS ( - SELECT - svc_id, - COUNT(node_id) AS node_count, - GROUP_CONCAT(nodename SEPARATOR ", ") AS nodenames - FROM svc_nodes - GROUP BY svc_id - ), + var nodeIDs []string + var nodenames []string + for rows.Next() { + var nodeID string + var nodename string + if err := rows.Scan(&nodeID, &nodename); err != nil { + return fmt.Errorf("failed to scan node row: %v", err) + } + nodeIDs = append(nodeIDs, nodeID) + nodenames = append(nodenames, nodename) + } - pkg_svc_diffs AS ( - SELECT - pkg_svc_counts.*, - svc_node_counts.nodenames, - svc_node_counts.node_count AS svc_node_count - FROM pkg_svc_counts - JOIN svc_node_counts ON pkg_svc_counts.svc_id = svc_node_counts.svc_id - WHERE - pkg_svc_counts.node_count < svc_node_counts.node_count - ), + if len(nodeIDs) < 2 { + return nil + } - pkg_svc_diffcount AS ( - SELECT - svc_nodes.svc_id, - svc_nodes.nodename, - svc_nodes.node_id, - svc_nodes.cluster_id, - pkg_svc_diffs.nodenames, - COUNT(*) AS pkg_diffs, - GROUP_CONCAT(DISTINCT pkg_svc_diffs.pkg_name SEPARATOR ", ") AS pkg_names - FROM pkg_svc_diffs - JOIN svc_nodes ON pkg_svc_diffs.node_id = svc_nodes.node_id - GROUP BY - svc_nodes.svc_id, - svc_nodes.nodename, - svc_nodes.node_id, - svc_nodes.cluster_id, - pkg_svc_diffs.nodenames - ), + // Count pkg diffs + var pkgDiffCount int + placeholders := make([]string, len(nodeIDs)) + args := make([]any, len(nodeIDs)) + for i, id := range nodeIDs { + placeholders[i] = "?" + args[i] = id + } + query = fmt.Sprintf(` + SELECT COUNT(pkg_name) + FROM ( + SELECT + pkg_name, + pkg_version, + pkg_arch, + pkg_type, + COUNT(DISTINCT node_id) AS c + FROM packages + WHERE + node_id IN (%s) + AND pkg_name NOT LIKE "gpg-pubkey%%" + GROUP BY + pkg_name, + pkg_version, + pkg_arch, + pkg_type + ) AS t + WHERE t.c != ? + `, strings.Join(placeholders, ",")) + args = append(args, len(nodeIDs)) - alerts AS ( - SELECT - services.svcname, - svcmon.mon_svctype, - pkg_svc_diffcount.* - FROM pkg_svc_diffcount - JOIN svcmon ON pkg_svc_diffcount.node_id = svcmon.node_id AND pkg_svc_diffcount.svc_id = svcmon.svc_id - JOIN services ON pkg_svc_diffcount.svc_id = services.svc_id - ) + err = oDb.DB.QueryRowContext(ctx, query, args...).Scan(&pkgDiffCount) + if err != nil { + return fmt.Errorf("failed to count package differences: %v", err) + } - SELECT - 'package differences in service' AS dash_type, - a.svc_id, - NULL AS node_id, - IF(a.mon_svctype = 'PRD', 1, 0) AS dash_severity, - CONCAT(a.pkg_diffs, ' package differences in service ', a.nodenames) AS dash_fmt, - JSON_OBJECT( - 'n', a.pkg_diffs, - 'nodes', a.nodenames, - 'svc_id', a.svc_id, - 'cluster_id', a.cluster_id - ) AS dash_dict, - MD5(CONCAT( - a.pkg_diffs, - a.nodenames, - a.svc_id, - a.cluster_id - )) AS dash_dict_md5, - @now AS dash_created, - @now AS dash_updated, - a.mon_svctype AS dash_env - FROM alerts a - ON DUPLICATE KEY UPDATE - dash_severity = VALUES(dash_severity), - dash_fmt = VALUES(dash_fmt), - dash_dict = VALUES(dash_dict), - dash_dict_md5 = VALUES(dash_dict_md5), - dash_updated = VALUES(dash_updated), - dash_env = VALUES(dash_env); - ` + if pkgDiffCount == 0 { + return nil + } - result, err := oDb.DB.ExecContext(ctx, request, nodeID) - if err != nil { - return fmt.Errorf("failed to update dashboard: %v", err) + sev := 0 + if monSvctype == "PRD" { + sev = 1 + } + + // truncate too long node names list + skip := 0 + trail := "" + nodesStr := strings.Join(nodenames, ",") + for len(nodesStr)+len(trail) > 50 { + skip++ + nodenames = nodenames[:len(nodenames)-1] + nodesStr = strings.Join(nodenames, ",") + trail = fmt.Sprintf(", ... (+%d)", skip) + } + nodesStr += trail + + // Format dash_dict JSON content + dashDict := map[string]any{ + "n": pkgDiffCount, + "nodes": nodesStr, + } + dashDictJSON, err := json.Marshal(dashDict) + if err != nil { + return fmt.Errorf("failed to marshal dash_dict: %v", err) + } + + dashDictMD5 := fmt.Sprintf("%x", md5.Sum(dashDictJSON)) + + query = ` + INSERT INTO dashboard + SET + dash_type = "package differences in cluster", + svc_id = ?, + node_id = "", + dash_severity = ?, + dash_fmt = "%(n)s package differences in cluster %(nodes)s", + dash_dict = ?, + dash_dict_md5 = ?, + dash_created = @now, + dash_updated = @now, + dash_env = ? + ON DUPLICATE KEY UPDATE + dash_severity = ?, + dash_fmt = "%(n)s package differences in cluster %(nodes)s", + dash_dict = ?, + dash_dict_md5 = ?, + dash_updated = @now, + dash_env = ? + ` + + _, err = oDb.DB.ExecContext(ctx, query, + svcID, sev, dashDictJSON, dashDictMD5, monSvctype, + sev, dashDictJSON, dashDictMD5, monSvctype, + ) + if err != nil { + return fmt.Errorf("failed to insert/update dashboard: %v", err) + } + + return nil } - rowAffected, err := result.RowsAffected() + // Get the list of svc_id for instances having recently updated status + rows, err := oDb.DB.QueryContext(ctx, ` + SELECT svcmon.svc_id, svcmon.mon_svctype, svcmon.mon_vmtype + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.node_id = ? AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 19 MINUTE) + `, nodeID) if err != nil { - return fmt.Errorf("failed to get affected rows: %v", err) - } else if rowAffected > 0 { - oDb.SetChange("dashboard") + return fmt.Errorf("failed to query svcmon: %v", err) } + defer rows.Close() - request = ` - DELETE d FROM dashboard d - LEFT JOIN svcmon m ON d.svc_id=m.svc_id - WHERE - m.node_id = ? AND - dash_type = "package differences in cluster" AND - dash_updated < @now - ` + var svcIDs []any + todo := make(map[string]func() error) + for rows.Next() { + var svcID string + var monSvctype, monVmtype sql.NullString + if err := rows.Scan(&svcID, &monSvctype, &monVmtype); err != nil { + return fmt.Errorf("failed to scan svcmon row: %v", err) + } - result, err = oDb.DB.ExecContext(ctx, request, nodeID) - if err != nil { - return fmt.Errorf("failed to delete old dashboard entries: %v", err) + // Remember which svc_id needs non-updated alert clean up + svcIDs = append(svcIDs, svcID) + + // Defer after rows.Close() to avoid busy db conn errors + todo[svcID] = func() error { + return processSvcID(svcID, monSvctype.String, monVmtype.String) + } + } + rows.Close() + for svcID, fn := range todo { + if err := fn(); err != nil { + return fmt.Errorf("failed to process svc_id %s: %v", svcID, err) + } } - rowAffected, err = result.RowsAffected() - if err != nil { - return fmt.Errorf("failed to get affected rows: %v", err) - } else if rowAffected > 0 { - oDb.SetChange("dashboard") + // Clean up non updated alerts + if len(svcIDs) > 0 { + query := fmt.Sprintf(` + DELETE FROM dashboard + WHERE svc_id IN (%s) + AND dash_type = "package differences in cluster" + AND dash_updated < @now + `, Placeholders(len(svcIDs))) + + _, err := oDb.DB.ExecContext(ctx, query, svcIDs...) + if err != nil { + return fmt.Errorf("failed to delete old dashboard entries: %v", err) + } } return nil From 667c7c8d008a23f38879f9497513209d456e076b Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Fri, 16 Jan 2026 10:18:03 +0100 Subject: [PATCH 8/9] Remove the "alert_package_differences_in_cluster" scheduler task The sql was too complex and the task did not exist in oc2 --- cdb/db_dashboard.go | 148 --------------------------------------- scheduler/task_alerts.go | 22 ------ 2 files changed, 170 deletions(-) diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index c114845..980723a 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -786,154 +786,6 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { return nil } -func (oDb *DB) DashboardUpdatePkgDiff(ctx context.Context) error { - request := `SET @now = NOW()` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - - request = ` - INSERT INTO dashboard ( - dash_type, - svc_id, - node_id, - dash_severity, - dash_fmt, - dash_dict, - dash_dict_md5, - dash_created, - dash_updated, - dash_env - ) - WITH - pkg_cluster_counts AS ( - SELECT - nodes.cluster_id, - nodes.node_id, - packages.pkg_name, - packages.pkg_version, - packages.pkg_arch, - packages.pkg_type, - COUNT(DISTINCT nodes.node_id) AS node_count - FROM packages - JOIN nodes USING (node_id) - WHERE packages.pkg_name NOT LIKE 'gpg-pubkey%' - GROUP BY - nodes.cluster_id, - packages.pkg_name, - packages.pkg_version, - packages.pkg_arch, - packages.pkg_type - ), - - cluster_nodes_counts AS ( - SELECT - cluster_id, - COUNT(node_id) AS node_count, - GROUP_CONCAT(nodename SEPARATOR ", ") AS nodenames - FROM nodes - GROUP BY cluster_id - ), - - pkg_cluster_diffs AS ( - SELECT - pkg_cluster_counts.*, - cluster_nodes_counts.nodenames, - cluster_nodes_counts.node_count AS cluster_node_count - FROM pkg_cluster_counts - JOIN cluster_nodes_counts - USING (cluster_id) - WHERE - pkg_cluster_counts.node_count < cluster_nodes_counts.node_count - ), - - pkg_cluster_diffcount AS ( - SELECT - nodes.cluster_id, - nodes.nodename, - nodes.node_id, - pkg_cluster_diffs.nodenames, - COUNT(*) AS pkg_diffs, - GROUP_CONCAT(DISTINCT pkg_cluster_diffs.pkg_name SEPARATOR ", ") AS pkg_names - FROM pkg_cluster_diffs - JOIN nodes USING (node_id) - GROUP BY - nodes.cluster_id - ), - - alerts AS ( - SELECT - services.svcname, - svcmon.svc_id, - svcmon.mon_svctype, - pkg_cluster_diffcount.* - FROM svcmon - JOIN pkg_cluster_diffcount USING (node_id) - JOIN services USING (svc_id) - ) - - SELECT - 'package differences in cluster' AS dash_type, - a.svc_id, - NULL AS node_id, - IF(a.mon_svctype = "PRD", 1, 0) AS dash_severity, - "%(n)d package differences in cluster %(nodes)s" AS dash_fmt, - JSON_OBJECT( - 'n', a.pkg_diffs, - 'nodes', a.nodenames, - 'cluster_id', a.cluster_id - ) AS dash_dict, - MD5(CONCAT( - a.pkg_diffs, - a.nodenames, - a.cluster_id - )) AS dash_dict_md5, - @now AS dash_created, - @now AS dash_updated, - a.mon_svctype AS dash_env - FROM alerts a - - ON DUPLICATE KEY UPDATE - dash_severity = VALUES(dash_severity), - dash_fmt = VALUES(dash_fmt), - dash_dict = VALUES(dash_dict), - dash_dict_md5 = VALUES(dash_dict_md5), - dash_updated = VALUES(dash_updated), - dash_env = VALUES(dash_env) - ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { - oDb.SetChange("dashboard") - } - - request = ` - DELETE FROM dashboard - WHERE - dash_type="package differences in cluster" AND - ( - dash_updated < @now or - dash_updated IS NULL - ) - ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { - oDb.SetChange("dashboard") - } - - return nil -} - func (oDb *DB) DashboardUpdatePkgDiffForNode(ctx context.Context, nodeID string) error { request := `SET @now = NOW()` _, err := oDb.DB.ExecContext(ctx, request) diff --git a/scheduler/task_alerts.go b/scheduler/task_alerts.go index b162525..b6ddf06 100644 --- a/scheduler/task_alerts.go +++ b/scheduler/task_alerts.go @@ -45,7 +45,6 @@ var TaskAlert1D = Task{ TaskAlertNodeCloseToMaintenanceEnd, TaskAlertNodeMaintenanceExpired, TaskAlertNodeWithoutMaintenanceEnd, - TaskAlertPackageDifferencesInCluster, TaskAlertPurgeActionErrors, TaskPurgeAlertsOnDeletedInstances, }, @@ -131,12 +130,6 @@ var TaskLogInstancesNotUpdated = Task{ timeout: 5 * time.Minute, } -var TaskAlertPackageDifferencesInCluster = Task{ - name: "alert_package_differences_in_cluster", - fn: taskAlertPackageDifferencesInCluster, - timeout: 5 * time.Minute, -} - var TaskAlertPurgeActionErrors = Task{ name: "alert_purge_action_errors", fn: taskAlertPurgeActionErrors, @@ -272,21 +265,6 @@ func taskLogInstancesNotUpdated(ctx context.Context, task *Task) error { return odb.Commit() } -func taskAlertPackageDifferencesInCluster(ctx context.Context, task *Task) error { - odb, err := task.DBX(ctx) - if err != nil { - return err - } - defer odb.Rollback() - if err := odb.DashboardUpdatePkgDiff(ctx); err != nil { - return err - } - if err := odb.Session.NotifyChanges(ctx); err != nil { - return err - } - return odb.Commit() -} - func taskAlertPurgeActionErrors(ctx context.Context, task *Task) error { odb, err := task.DBX(ctx) if err != nil { From 90ea9e16cb2b4efd75eaf9ba73f1473b3db125fe Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Fri, 16 Jan 2026 11:15:43 +0100 Subject: [PATCH 9/9] Add the "scrub_static" scheduler task Also simplify the scheduler config, moving away from per-task directories setup, to a scheduler-level directories map: scheduler: directories: uploads: /srv/oc3/uploads static: /srv/oc3/static ... --- scheduler/task_metrics.go | 19 ++------ scheduler/task_scrub.go | 93 +++++++++++++++++++++++---------------- 2 files changed, 58 insertions(+), 54 deletions(-) diff --git a/scheduler/task_metrics.go b/scheduler/task_metrics.go index 9c2f09f..d608fca 100644 --- a/scheduler/task_metrics.go +++ b/scheduler/task_metrics.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "os" "path/filepath" "strings" "time" @@ -25,21 +24,11 @@ var TaskMetrics = Task{ } func MakeWSPFilename(format string, args ...any) (string, error) { - var dir string - candidates := viper.GetStringSlice("scheduler.task.metrics.directories") - if len(candidates) == 0 { - return "", fmt.Errorf("scheduler.task.metrics.directories is not set") + directory := viper.GetString("scheduler.directories.uploads") + if directory == "" { + return "", fmt.Errorf("define scheduler.directories.uploads") } - for _, d := range candidates { - if _, err := os.Stat(d); err == nil { - dir = d - break - } - } - if dir == "" { - return "", fmt.Errorf("scheduler.task.metrics.directories has no existing entry") - } - return filepath.Join(dir, fmt.Sprintf(format+".wsp", args...)), nil + return filepath.Join(directory, "stats", fmt.Sprintf(format+".wsp", args...)), nil } func taskMetrics(ctx context.Context, task *Task) error { diff --git a/scheduler/task_scrub.go b/scheduler/task_scrub.go index da039d7..ad5a707 100644 --- a/scheduler/task_scrub.go +++ b/scheduler/task_scrub.go @@ -93,6 +93,12 @@ var TaskScrubSvcdisks = Task{ timeout: time.Minute, } +var TaskScrubStatic = Task{ + name: "scrub_static", + fn: taskScrubStatic, + timeout: time.Minute, +} + var TaskScrubTempviz = Task{ name: "scrub_tempviz", fn: taskScrubTempviz, @@ -157,6 +163,7 @@ var TaskScrub1D = Task{ TaskScrubPatches, TaskScrubPdf, TaskScrubResmon, + TaskScrubStatic, TaskScrubStorArray, TaskScrubSvcdisks, TaskUpdateStorArrayDGQuota, @@ -578,21 +585,12 @@ func taskUpdateStorArrayDGQuota(ctx context.Context, task *Task) error { return odb.Commit() } -func taskScrubTempviz(ctx context.Context, task *Task) error { - threshold := time.Now().Add(-1 * time.Hour) - directories := viper.GetStringSlice("scheduler.task.scrub_tempviz.directories") - if len(directories) == 0 { - slog.Warn("skip: define scheduler.task.scrub_tempviz.directories") - return nil - } +func scrubFiles(pattern string, threshold time.Time) error { var matches []string - for _, directory := range directories { - pattern := filepath.Join(directory, "tempviz*") - if m, err := filepath.Glob(pattern); err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } else { - matches = append(matches, m...) - } + if m, err := filepath.Glob(pattern); err != nil { + return fmt.Errorf("failed to glob files: %w", err) + } else { + matches = append(matches, m...) } for _, fpath := range matches { fileInfo, err := os.Stat(fpath) @@ -610,36 +608,53 @@ func taskScrubTempviz(ctx context.Context, task *Task) error { return nil } -func taskScrubPdf(ctx context.Context, task *Task) error { - threshold := time.Now().Add(-24 * time.Hour) - directories := viper.GetStringSlice("scheduler.task.scrub_pdf.directories") - if len(directories) == 0 { - slog.Warn("skip: define scheduler.task.scrub_pdf.directories") +func taskScrubStatic(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-1 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") return nil } - var matches []string - for _, directory := range directories { - pattern := filepath.Join(directory, "*-*-*-*-*.pdf") - if m, err := filepath.Glob(pattern); err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } else { - matches = append(matches, m...) - } + if err := scrubFiles(filepath.Join(directory, "tempviz*.png"), threshold); err != nil { + return err } - for _, fpath := range matches { - fileInfo, err := os.Stat(fpath) - if err != nil { - return err - } - mtime := fileInfo.ModTime() - if mtime.Before(threshold) { - slog.Info(fmt.Sprintf("rm %s mtime %s", fpath, mtime)) - if err := os.Remove(fpath); err != nil { - return fmt.Errorf("failed to rm %s: %w", fpath, err) - } - } + if err := scrubFiles(filepath.Join(directory, "tempviz*.dot"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stats_*_[0-9]*.png"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stat_*_[0-9]*.png"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stats_*_[0-9]*.svg"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "*-*-*-*.pdf"), threshold); err != nil { + return err } return nil + +} + +func taskScrubTempviz(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-1 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") + return nil + } + return scrubFiles(filepath.Join(directory, "tempviz*"), threshold) +} + +func taskScrubPdf(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-24 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") + return nil + } + return scrubFiles(filepath.Join(directory, "*-*-*-*-*.pdf"), threshold) } func taskScrubUnfinishedActions(ctx context.Context, task *Task) error {