Skip to content
214 changes: 214 additions & 0 deletions cdb/db_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package cdb

import (
"context"
"crypto/md5"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
)

Expand Down Expand Up @@ -782,3 +785,214 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error {
}
return nil
}

func (oDb *DB) DashboardUpdatePkgDiffForNode(ctx context.Context, nodeID string) error {
request := `SET @now = NOW()`
_, err := oDb.DB.ExecContext(ctx, request)
if err != nil {
return err
}

processSvcID := func(svcID, monSvctype, monVmtype string) error {
var query string
if monVmtype != "" {
// encap peers
query = `
SELECT DISTINCT nodes.node_id, nodes.nodename
FROM svcmon
JOIN nodes ON svcmon.node_id = nodes.node_id
WHERE svcmon.svc_id = ?
AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE)
AND svcmon.mon_vmtype != ""
ORDER BY nodes.nodename
`
} else {
// non-encap peers
query = `
SELECT DISTINCT nodes.node_id, nodes.nodename
FROM svcmon
JOIN nodes ON svcmon.node_id = nodes.node_id
WHERE svcmon.svc_id = ?
AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE)
AND svcmon.mon_vmtype = ""
ORDER BY nodes.nodename
`
}

rows, err := oDb.DB.QueryContext(ctx, query, svcID)
if err != nil {
return fmt.Errorf("failed to query nodes: %v", err)
}
defer rows.Close()

var nodeIDs []string
var nodenames []string
for rows.Next() {
var nodeID string
var nodename string
if err := rows.Scan(&nodeID, &nodename); err != nil {
return fmt.Errorf("failed to scan node row: %v", err)
}
nodeIDs = append(nodeIDs, nodeID)
nodenames = append(nodenames, nodename)
}

if len(nodeIDs) < 2 {
return nil
}

// Count pkg diffs
var pkgDiffCount int
placeholders := make([]string, len(nodeIDs))
args := make([]any, len(nodeIDs))
for i, id := range nodeIDs {
placeholders[i] = "?"
args[i] = id
}
query = fmt.Sprintf(`
SELECT COUNT(pkg_name)
FROM (
SELECT
pkg_name,
pkg_version,
pkg_arch,
pkg_type,
COUNT(DISTINCT node_id) AS c
FROM packages
WHERE
node_id IN (%s)
AND pkg_name NOT LIKE "gpg-pubkey%%"
GROUP BY
pkg_name,
pkg_version,
pkg_arch,
pkg_type
) AS t
WHERE t.c != ?
`, strings.Join(placeholders, ","))
args = append(args, len(nodeIDs))

err = oDb.DB.QueryRowContext(ctx, query, args...).Scan(&pkgDiffCount)
if err != nil {
return fmt.Errorf("failed to count package differences: %v", err)
}

if pkgDiffCount == 0 {
return nil
}

sev := 0
if monSvctype == "PRD" {
sev = 1
}

// truncate too long node names list
skip := 0
trail := ""
nodesStr := strings.Join(nodenames, ",")
for len(nodesStr)+len(trail) > 50 {
skip++
nodenames = nodenames[:len(nodenames)-1]
nodesStr = strings.Join(nodenames, ",")
trail = fmt.Sprintf(", ... (+%d)", skip)
}
nodesStr += trail

// Format dash_dict JSON content
dashDict := map[string]any{
"n": pkgDiffCount,
"nodes": nodesStr,
}
dashDictJSON, err := json.Marshal(dashDict)
if err != nil {
return fmt.Errorf("failed to marshal dash_dict: %v", err)
}

dashDictMD5 := fmt.Sprintf("%x", md5.Sum(dashDictJSON))

query = `
INSERT INTO dashboard
SET
dash_type = "package differences in cluster",
svc_id = ?,
node_id = "",
dash_severity = ?,
dash_fmt = "%(n)s package differences in cluster %(nodes)s",
dash_dict = ?,
dash_dict_md5 = ?,
dash_created = @now,
dash_updated = @now,
dash_env = ?
ON DUPLICATE KEY UPDATE
dash_severity = ?,
dash_fmt = "%(n)s package differences in cluster %(nodes)s",
dash_dict = ?,
dash_dict_md5 = ?,
dash_updated = @now,
dash_env = ?
`

_, err = oDb.DB.ExecContext(ctx, query,
svcID, sev, dashDictJSON, dashDictMD5, monSvctype,
sev, dashDictJSON, dashDictMD5, monSvctype,
)
if err != nil {
return fmt.Errorf("failed to insert/update dashboard: %v", err)
}

return nil
}

// Get the list of svc_id for instances having recently updated status
rows, err := oDb.DB.QueryContext(ctx, `
SELECT svcmon.svc_id, svcmon.mon_svctype, svcmon.mon_vmtype
FROM svcmon
JOIN nodes ON svcmon.node_id = nodes.node_id
WHERE svcmon.node_id = ? AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 19 MINUTE)
`, nodeID)
if err != nil {
return fmt.Errorf("failed to query svcmon: %v", err)
}
defer rows.Close()

var svcIDs []any
todo := make(map[string]func() error)
for rows.Next() {
var svcID string
var monSvctype, monVmtype sql.NullString
if err := rows.Scan(&svcID, &monSvctype, &monVmtype); err != nil {
return fmt.Errorf("failed to scan svcmon row: %v", err)
}

// Remember which svc_id needs non-updated alert clean up
svcIDs = append(svcIDs, svcID)

// Defer after rows.Close() to avoid busy db conn errors
todo[svcID] = func() error {
return processSvcID(svcID, monSvctype.String, monVmtype.String)
}
}
rows.Close()
for svcID, fn := range todo {
if err := fn(); err != nil {
return fmt.Errorf("failed to process svc_id %s: %v", svcID, err)
}
}

// Clean up non updated alerts
if len(svcIDs) > 0 {
query := fmt.Sprintf(`
DELETE FROM dashboard
WHERE svc_id IN (%s)
AND dash_type = "package differences in cluster"
AND dash_updated < @now
`, Placeholders(len(svcIDs)))

_, err := oDb.DB.ExecContext(ctx, query, svcIDs...)
if err != nil {
return fmt.Errorf("failed to delete old dashboard entries: %v", err)
}
}

return nil
}
61 changes: 61 additions & 0 deletions cdb/db_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,64 @@ func (oDb *DB) UpdateVirtualAssets(ctx context.Context) error {
}
return nil
}

func (oDb *DB) UpdateVirtualAsset(ctx context.Context, svcID, nodeID string) error {
request := `
UPDATE nodes n
JOIN (
SELECT
svcmon.mon_vmname AS vmname,
services.svc_app AS svc_app,
nodes.app AS node_app,
nodes.loc_addr,
nodes.loc_city,
nodes.loc_zip,
nodes.loc_room,
nodes.loc_building,
nodes.loc_floor,
nodes.loc_rack,
nodes.power_cabinet1,
nodes.power_cabinet2,
nodes.power_supply_nb,
nodes.power_protect,
nodes.power_protect_breaker,
nodes.power_breaker1,
nodes.power_breaker2,
nodes.loc_country,
nodes.enclosure
FROM svcmon
JOIN services ON svcmon.svc_id = services.svc_id
JOIN nodes ON svcmon.node_id = nodes.node_id
WHERE svcmon.svc_id = ? AND svcmon.node_id = ?
) AS source
SET
n.loc_addr = COALESCE(NULLIF(source.loc_addr, ''), n.loc_addr),
n.loc_city = COALESCE(NULLIF(source.loc_city, ''), n.loc_city),
n.loc_zip = COALESCE(NULLIF(source.loc_zip, ''), n.loc_zip),
n.loc_room = COALESCE(NULLIF(source.loc_room, ''), n.loc_room),
n.loc_building = COALESCE(NULLIF(source.loc_building, ''), n.loc_building),
n.loc_floor = COALESCE(NULLIF(source.loc_floor, ''), n.loc_floor),
n.loc_rack = COALESCE(NULLIF(source.loc_rack, ''), n.loc_rack),
n.power_cabinet1 = COALESCE(NULLIF(source.power_cabinet1, ''), n.power_cabinet1),
n.power_cabinet2 = COALESCE(NULLIF(source.power_cabinet2, ''), n.power_cabinet2),
n.power_supply_nb = COALESCE(NULLIF(source.power_supply_nb, ''), n.power_supply_nb),
n.power_protect = COALESCE(NULLIF(source.power_protect, ''), n.power_protect),
n.power_protect_breaker = COALESCE(NULLIF(source.power_protect_breaker, ''), n.power_protect_breaker),
n.power_breaker1 = COALESCE(NULLIF(source.power_breaker1, ''), n.power_breaker1),
n.power_breaker2 = COALESCE(NULLIF(source.power_breaker2, ''), n.power_breaker2),
n.loc_country = COALESCE(NULLIF(source.loc_country, ''), n.loc_country),
n.enclosure = COALESCE(NULLIF(source.enclosure, ''), n.enclosure)
WHERE
n.nodename = source.vmname AND
n.app IN (source.svc_app, source.node_app)`
result, err := oDb.DB.ExecContext(ctx, request, svcID, nodeID)
if err != nil {
return err
}
if rowAffected, err := result.RowsAffected(); err != nil {
return err
} else if rowAffected > 0 {
oDb.SetChange("nodes")
}
return nil
}
4 changes: 4 additions & 0 deletions scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (t *Scheduler) Debugf(format string, args ...any) {

func (t *Scheduler) toggleTasks(ctx context.Context, states map[string]State) {
for _, task := range Tasks {
if task.period == 0 {
//task.Debugf("skip: no period")
continue
}
name := task.Name()
storedState, _ := states[name]
cachedState, hasCachedState := t.states[name]
Expand Down
2 changes: 1 addition & 1 deletion scheduler/task_alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var TaskAlert1M = Task{
TaskAlertInstancesNotUpdated,
},
period: time.Minute,
timeout: 15 * time.Minute,
timeout: 15 * time.Second,
}

var TaskAlert1H = Task{
Expand Down
19 changes: 4 additions & 15 deletions scheduler/task_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"
Expand All @@ -25,21 +24,11 @@ var TaskMetrics = Task{
}

func MakeWSPFilename(format string, args ...any) (string, error) {
var dir string
candidates := viper.GetStringSlice("scheduler.task.metrics.directories")
if len(candidates) == 0 {
return "", fmt.Errorf("scheduler.task.metrics.directories is not set")
directory := viper.GetString("scheduler.directories.uploads")
if directory == "" {
return "", fmt.Errorf("define scheduler.directories.uploads")
}
for _, d := range candidates {
if _, err := os.Stat(d); err == nil {
dir = d
break
}
}
if dir == "" {
return "", fmt.Errorf("scheduler.task.metrics.directories has no existing entry")
}
return filepath.Join(dir, fmt.Sprintf(format+".wsp", args...)), nil
return filepath.Join(directory, "stats", fmt.Sprintf(format+".wsp", args...)), nil
}

func taskMetrics(ctx context.Context, task *Task) error {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/task_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
var TaskUpdateVirtualAssets = Task{
name: "update_virtual_assets",
fn: taskUpdateVirtualAssets,
timeout: time.Minute,
timeout: 10 * time.Second,
}

func taskUpdateVirtualAssets(ctx context.Context, task *Task) error {
Expand Down
Loading
Loading