diff --git a/api/api.yaml b/api/api.yaml index e934272..842964f 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -5,7 +5,7 @@ servers: info: title: opensvc collector api - version: 1.0.8 + version: 1.0.9 paths: /feed/daemon/ping: @@ -197,6 +197,64 @@ paths: tags: - agent + /feed/action: + put: + description: | + End an action for a given object path + operationId: PutFeedActionEnd + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Action' + responses: + 202: + description: action end accepted + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 500: + $ref: '#/components/responses/500' + security: + - basicAuth: [ ] + - bearerAuth: [ ] + tags: + - agent + post: + description: | + Begin an action for a given object path + operationId: PostFeedAction + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Action' + responses: + 202: + description: action begin accepted + content: + application/json: + schema: + $ref: '#/components/schemas/ActionRequestAccepted' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 500: + $ref: '#/components/responses/500' + security: + - basicAuth: [ ] + - bearerAuth: [ ] + tags: + - agent + /version: get: operationId: GetVersion @@ -292,6 +350,55 @@ components: type: string description: the opensvc client data version + ActionRequestAccepted: + type: object + required: + - uuid + properties: + uuid: + type: string + + Action: + type: object + required: + - path + - action + - version + - begin + - cron + - session_uuid + - argv + - uuid + - end + - actionlogfile + - status + properties: + path: + type: string + action: + type: string + version: + type: string + description: the opensvc client data version + begin: + type: string + cron: + type: boolean + session_uuid: + type: string + argv: + type: array + items: + type: string + uuid: + type: string + end: + type: string + actionlogfile: + type: string + status: + type: string + NodeDisks: type: object properties: @@ -558,3 +665,5 @@ components: items: type: string description: object name + + \ No newline at end of file diff --git a/api/codegen_server_gen.go b/api/codegen_server_gen.go index ab80781..fbbc68a 100644 --- a/api/codegen_server_gen.go +++ b/api/codegen_server_gen.go @@ -24,6 +24,12 @@ type ServerInterface interface { // (GET /docs/openapi) GetSwagger(ctx echo.Context) error + // (POST /feed/action) + PostFeedAction(ctx echo.Context) error + + // (PUT /feed/action) + PutFeedActionEnd(ctx echo.Context) error + // (POST /feed/daemon/ping) PostFeedDaemonPing(ctx echo.Context) error @@ -63,6 +69,32 @@ func (w *ServerInterfaceWrapper) GetSwagger(ctx echo.Context) error { return err } +// PostFeedAction converts echo context to params. +func (w *ServerInterfaceWrapper) PostFeedAction(ctx echo.Context) error { + var err error + + ctx.Set(BasicAuthScopes, []string{}) + + ctx.Set(BearerAuthScopes, []string{}) + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.PostFeedAction(ctx) + return err +} + +// PutFeedActionEnd converts echo context to params. +func (w *ServerInterfaceWrapper) PutFeedActionEnd(ctx echo.Context) error { + var err error + + ctx.Set(BasicAuthScopes, []string{}) + + ctx.Set(BearerAuthScopes, []string{}) + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.PutFeedActionEnd(ctx) + return err +} + // PostFeedDaemonPing converts echo context to params. func (w *ServerInterfaceWrapper) PostFeedDaemonPing(ctx echo.Context) error { var err error @@ -201,6 +233,8 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL } router.GET(baseURL+"/docs/openapi", wrapper.GetSwagger) + router.POST(baseURL+"/feed/action", wrapper.PostFeedAction) + router.PUT(baseURL+"/feed/action", wrapper.PutFeedActionEnd) router.POST(baseURL+"/feed/daemon/ping", wrapper.PostFeedDaemonPing) router.POST(baseURL+"/feed/daemon/status", wrapper.PostFeedDaemonStatus) router.POST(baseURL+"/feed/instance/resource_info", wrapper.PostFeedInstanceResourceInfo) @@ -215,37 +249,40 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xZW3PbuBX+Kxi0D8kMV5Lt9DLqU3bTdN121m7ktA+2xwOBRxQ2JMAAh4qVDP97BwCv", - "IkTJ3ijTmfTJFnF4Lt+54vAL5SrLlQSJhs6/0JxplgGCdr+E/FcBervYSu5/0jn9aJ/QiEqWAZ1TY88i", - "avgaMmaJcJvb50ulUmCSlmUZUQ0mV9KAY/pqNrN/uJIIEu2/LM9TwRkKJae/GiXts5bh7zWs6Jz+btpq", - "OvWnZnqt1TKFzEuJwXAtcsuGzumPLCbv4GMBBmkZ0Vezs28h9b1kBa6VFp8h9mIvvoXYt0ovRRyDtDL/", - "8G0AvpQIWrKULEBvQJO/aq20lf8WIH7DIFPyWsjkNeeQI8RPUinXKgeNwkeMWv4KHB8+CVyrAh+4kiuR", - "2IO+QqkwSNSKeHJiA9QQXDMkGj4WQoMh11eLGzJV/GK6AoinnnJaMYyoQMjMkHGHIY3qADeohUysvdUD", - "pjXb0rJ94F8LQRc7dIhBhoUhKDIwyLLckE8iTckSiIaVBrOGmKyUJlLF0Ad24d78P7SHoB3Ds6yrllPr", - "jTAfhvDESaektWqJOPg4UzGkwZMK5pzhOniuIRHeUYMjIz6DPVgpnTGkcyokXpy3YAmJkIBLvMJAV7HO", - "yQZkrHSAv5PtPBjT+a01rK9sJb/i3TCqbY0sQvcDt0T0UhpkksM7MKrQHC7lSg3hFdXTJjj6xx9ga8aP", - "g4htWFrAYVvt6zVxyIR+7EVUB72+w1Q7BJ3ix/DcGxCocpWqZHtYYuUmB+WYJ3zJCIQ4w27fbl/cq9sG", - "tKmitZ97uAaicpBmwwlPBUgkljupX4iOM8Vp1IoJGfWLisFm7Ig9TdCMtTeX9UfUmYheuf9+aupjXyjL", - "8yBUXGVZVZUHZ7HOH1wlGjvsx/+B+hhRkJsg3SqFx4eMPYargz8VcuQUmU4AwwSZkgKVhvhBV9n+wFUh", - "91ArzddgUDMMW76/RrJPnf7UlMPlFoOtw3CVw9PQe2LShQLzWhnszz/DYGn8Gu6xPC0MQtWl9jbOLtWR", - "7bOu7EfI7vb5r9S8T1g3PKCteeO1o++ifWWRr5lMdtIvaL3SRFQl9hgQ9pdbDRuhCvNQ5DFDiB8Y9uLc", - "PvzBjoohKc9552TuqOp3jeABb1QXjGEdB2QiHWr3mqyLjMkfNLCYLVMg8JinTLpxl5gcuFgJTlARXAtD", - "FOeF1iA52PDGNdzJ3Euc3Mlg2WjioS/2Zg3k55ub63qy5DbvXty+e/vTn84vzu4jsgDuVPjjS5KABFvf", - "YrLceplKi0RIYvxNyc6gYe1ISLlO8USBKYQwMWulMdqFxhRZxvR2hzmxfCeEXCJZ/Hz1/p9v7uQvVzfE", - "+4ustMq6iqHar2ZE4NFeQe6kNSkvdK4MGEuUKs5S8dl75QVMkklECiNkYl9lHMUGSHVluZMSEoXC0f6F", - "GAASgPVi8upl0GW7wefDpnFkjVko9nLGP7AEAu1c83APcpmepk/MNL8qCc72yUhjGh+9DpREX5PblHUm", - "VZy94BAiZmswlIwdpI6aq2r6o0arjlF9qZ0DeGRZbkOfziazydnBMNhfcqyVwAstcLuw2npRS2YEf134", - "0cNZ4ZZY9mkra42YW4WXwDTomtr/eltHwt//c1PvwxwLd7rLoyyj5vJT5TRtSq5KU+CoNGG56PhwTs8m", - "s8mfXR/PQdrDOb2YzCYz6qcmZ8g0VtxMG4IvtBrbLK4uwy5jOqd/A1x8YkniNOtt586fuDw6eCG/+kdn", - "/RaKmUb81BK1O7NDtBedXdc4rSVykccS46a3YpkKTu/tM7+t8FuDaV7Pa8rgsMq+80uEZkjat8VxVaqP", - "d2As9PEKBn9U8fbr7euGgsp+bqAuoBw4/fwwinvWemVEz2evhmhlwrh630epXr9E1b6oAr46FYY0inYT", - "lc5veyl6e19GX3ppeHtf3rcOZokFcujftrk/w8MH3bqoO87pHVuJ+vqu3VksumycHZONs/+ZLP9qQVOP", - "9dPmWluX7HD0vHdDuL0QcA32v5oBqRkQy2AkjoJbs9PEU1DU8fHUtzxsqG2IdnQbLGFP5KZD2b1Apdub", - "zfnkrHVQVX/szWVyhH+aTO9+LrsNA96STLuf06xJp/TrUyrEbMSjFTLGLXiIKTgHY1ZFmm7JC7OVfK2V", - "VIV56fvA+WFO7SqeVVWGvGC7nL7fomOb4zSuP0YcV2fcDsi+Y6+lciWSwsfuSCTXC9QTVZd2P/vckrLP", - "plNXE4d/5xoU9IBFkXgiqyNXG9DbEbQXnt9psK6UfS7QtRkM2YHPj99vUvY/Zh6fmNWK8Nik7H1jOE2w", - "9ET85nb/rTKzswiorrN9hTRgoaW9L3eWlIMb77+bo9904x2Dt5a+9x78rPsqZzlbilS4vch96VHVm3rq", - "KHRK53Sq+AUt78v/BgAA//89OreI0iMAAA==", + "H4sIAAAAAAAC/+xaW28buRX+KwTbhwSYlWQ7bVH1ydkkXbfF2o2c9sEWBIpzNOJmhpyQHMVKoP9e8DYX", + "iRrJ3igI4D4lHh6ey3cuPDzUV0xFUQoOXCs8/opLIkkBGqT9i/F/VyDXkzWn7k88xp/MF5xgTgrAY6zM", + "WoIVXUJBDJFel+b7XIgcCMebzSbBElQpuALL9NVoZP6hgmvg2vyXlGXOKNFM8OFvSnDzrWH4RwkLPMZ/", + "GDaaDt2qGt5IMc+hcFJSUFSy0rDBY/yapOg9fKpAabxJ8KvR2feQ+oGTSi+FZF8gdWIvvofYd0LOWZoC", + "NzL/9H0AvuIaJCc5moBcgURvpRTSyH8HkL4hUAh+w3h2SSmUGtJHqVRKUYLUzEWMmP8GVM8+M70UlZ5R", + "wRcsMwtdhXKmNBIL5MiRCVCF9JJoJOFTxSQodHM9uUVDQS+GC4B06CiHnmGCmYZC7TJuMcRJCHClJeOZ", + "sdd/IFKSNd40H9y2GHSpRQcpTXSlkGYFKE2KUqHPLM/RHJCEhQS1hBQthERcpNAFdmJ3/h/aQ9D24bkJ", + "VcuqdUkdg22ASP19Rzm3lItswXKIU8hsZQtnMP6AgQmeQ8bi0qjsqFHX1wQDT6M7SqKX0QUFSjHBZ1XF", + "4jsdetGlvXtWIJVHqusRvQQkSuBqRRHNGXCNUqIJCht23G4PDBtVKR7fOTMC2rgRFMDy0GyZ5cH3CjuQ", + "tl1WGzrdCazEB4Q/QdqZ1o2PPXhs2WCpYlLeMPVxl2maRSHeg3whUsijKz679waChGxfcCv2xcb0QsiC", + "aDzGjOuL88ZZjGvIwNb7SkFbsdbKCngq5GF4rIfaynr5nnfNKNiaGIRicF5xpQmn8B6UqCSFK74Qu/Ay", + "/7VOy+7yR1ir/uV4ApC8gsO2mu2BOGbCdkWQx8SXtAhaxY/huTcgtChFLrL1YYneTRbKPk9M6lKyFeJE", + "t9vFZuNe3U5dYKxGjZiYUb+KFEzG9thTB01fV2Wz/ojjLcHX9n8/18fy1uFUlvGzQhSFbwZ21lJZzuwB", + "2LeoHndqAV9F6RY5PMwK8hCvDm61c9xtr2oiM9BxgkJwpoWEdCZ9ts+oqPgeaiHpEpSWRMPjDktJPrfa", + "orocztc62rEoKkp4HHqPTLpYYN4Ipbtt926w1H6Nt3Y0r5QG3xzt7dfaVEd2baGyHyG73V5+o57xhHXD", + "AdqY1187ui7aVxbpkvBsK/2i1guJmC+xx4Cwv9xKWDFRqVlVpkRDOiO6E+fm40/mhhKT8pQ9J3OHr98B", + "wQPe8Pfa3ToOmrB8V7tLtKwKwn+SQFIyzwHBQ5kTbm9ZSJVA2YJRpAXSS6aQoLSSEjgFE956Cfe8dBIH", + "9zxaNup46Iq9XQL65fb2JlxoqMm7F3fv3/38l/OLs2mCJmD7VPTnlygDDqa+pWi+djKFZBnjSLkLurn6", + "xLVDMeVaxVMz7W4425iopZA62YZGVUVB5HqLOTJ8BwhdaTT55frDv97c81+vb5HzF1pIUbQV02K/mgmC", + "B9OP33NjUlnJUihQhigXlOTsi/PKCxhkgwRVivHMbDXN/wqQvynfcw6Z0MzS/g0pABSB9WLw6mXUZdvB", + "58KmdmTALBZ7JaEfSQaR41zS+BlkMz3PH5lpbkIX7e2znoOpv/U6UBJdTW5S1prkOTvBMUTUWulYMraQ", + "OqqvCvRHtVYto7pSWwvwQIrShD4eDUaDs4NhsL/k2Gs3rSTT64nR1omaE8XoZeVaD2uFvdubr42spdal", + "mw0QCTJQu7/ehUj4x39vwxjWsrCr2zw2m6S+/PicxnXJFXkOVAuJSMlaPhzjs8Fo8Fd7jpfAzeIYXwxG", + "gxF2XZM1ZJgKqoY1wVfs2zaDq82wqxSP8d9BTz6TLLOadYbC54+cWR6cA13/szX1jcVMLX5oiJpR7SHa", + "i9aItZ/WENnII5my3Vs1zxnFU/PNDcma4VIplN4tsK/BlG/CkSO0FZygjK2Ah17JeMDWpy7Sodu4DFMT", + "6QYZr0W6/mbDYc98080CLSvY7Lj3/BtL3R7MRELAgzZ3INaESXiBOOTp0Q8TQaFy4PFdp2bcTTfJ105d", + "uJtupk3EkcygPTVnThUJr7c8fVpwVa3YemsHaz9EeEXdD8bI5+z8utq40fiwDLfDaMl57ybl9ZVs31NF", + "T81pXUJPExgRQccHST/oe96uNgk+H73aRatgynaXXZTCG0PiH0U88H6VKVQregr/NleJJ3j4oFsnob89", + "vWO9qG/v2q3Xs2ddFMIQYVgP0UKDGI+eD/bKj4REVIL5X2CAAgNkGPTEUXRGf5p4iop66nkSN9S03+aY", + "2XlpPJGbDmX3RAvZzFHOB2eNg3z9SYkmgyP8U2d6+zchd3HAG5Jh+zcjxqRT+vUxFWLU41GPjLLjZKQq", + "SkGpRZXna/RCrTldSsFFpV66c+D8MKfmvTm0HugF2eb0fIuOORyHaXj6PK7O2Imz2aOQG8dXLnZ7Ijk8", + "15youjSvQU8tKftsOnU1sfi3hi5RDxgUkSMyOlKxArnuQXvi+J0Ga6/sU4EOZhBNDvzG5vkmZfcXO8cn", + "pr8rHpuUnRfN0wRLR8TvPu6/V2a2xo5+eNZVSIKuJEekZK0nkZ352n/qpd81X+uDN0jfO3V70nSMkpLM", + "Wc7sFHa6cajKVeg6KpnjMR4KeoE3083/AgAA//8UylEOtyoAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/api/codegen_type_gen.go b/api/codegen_type_gen.go index 6453c5f..98a2636 100644 --- a/api/codegen_type_gen.go +++ b/api/codegen_type_gen.go @@ -12,6 +12,28 @@ const ( BearerAuthScopes = "bearerAuth.Scopes" ) +// Action defines model for Action. +type Action struct { + Action string `json:"action"` + Actionlogfile string `json:"actionlogfile"` + Argv []string `json:"argv"` + Begin string `json:"begin"` + Cron bool `json:"cron"` + End string `json:"end"` + Path string `json:"path"` + SessionUuid string `json:"session_uuid"` + Status string `json:"status"` + Uuid string `json:"uuid"` + + // Version the opensvc client data version + Version string `json:"version"` +} + +// ActionRequestAccepted defines model for ActionRequestAccepted. +type ActionRequestAccepted struct { + Uuid string `json:"uuid"` +} + // Disk defines model for Disk. type Disk struct { Dg string `json:"dg"` @@ -161,6 +183,12 @@ type PostFeedInstanceStatusParams struct { Sync *InQuerySync `form:"sync,omitempty" json:"sync,omitempty"` } +// PostFeedActionJSONRequestBody defines body for PostFeedAction for application/json ContentType. +type PostFeedActionJSONRequestBody = Action + +// PutFeedActionEndJSONRequestBody defines body for PutFeedActionEnd for application/json ContentType. +type PutFeedActionEndJSONRequestBody = Action + // PostFeedDaemonPingJSONRequestBody defines body for PostFeedDaemonPing for application/json ContentType. type PostFeedDaemonPingJSONRequestBody = PostFeedDaemonPing diff --git a/apihandlers/post_feed_action.go b/apihandlers/post_feed_action.go new file mode 100644 index 0000000..f1792ce --- /dev/null +++ b/apihandlers/post_feed_action.go @@ -0,0 +1,83 @@ +package apihandlers + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/labstack/echo/v4" + + "github.com/opensvc/oc3/api" + "github.com/opensvc/oc3/cachekeys" +) + +// { +// "action": "thaw", +// "argv": [ +// "foo", +// "thaw", +// "--local" +// ], +// "begin": "2026-01-12 10:57:12", +// "cron": false, +// "path": "foo", +// "session_uuid": "b9d795bc-498e-4c20-aada-9feec2eaa947", +// "version": "2.1-1977" +// } + +// PostFeedAction handles POST /action/begin +func (a *Api) PostFeedAction(c echo.Context) error { + keyH := cachekeys.FeedActionH + keyQ := cachekeys.FeedActionQ + keyPendingH := cachekeys.FeedActionPendingH + + log := getLog(c) + + nodeID := nodeIDFromContext(c) + if nodeID == "" { + log.Debug("node auth problem") + return JSONNodeAuthProblem(c) + } + + ClusterID := clusterIDFromContext(c) + if ClusterID == "" { + return JSONProblemf(c, http.StatusConflict, "Refused", "authenticated node doesn't define cluster id") + } + + var payload api.PostFeedActionJSONRequestBody + if err := c.Bind(&payload); err != nil { + return JSONProblem(c, http.StatusBadRequest, "Failed to json decode request body", err.Error()) + } + + if !strings.HasPrefix(payload.Version, "2.") && !strings.HasPrefix(payload.Version, "3.") { + log.Error(fmt.Sprintf("unexpected version %s", payload.Version)) + return JSONProblemf(c, http.StatusBadRequest, "BadRequest", "unsupported data client version: %s", payload.Version) + } + + b, err := json.Marshal(payload) + if err != nil { + return JSONProblem(c, http.StatusInternalServerError, "Failed to re-encode config", err.Error()) + } + + reqCtx := c.Request().Context() + + uuid := uuid.New().String() + idx := fmt.Sprintf("%s@%s@%s:%s", payload.Path, nodeID, ClusterID, uuid) + + s := fmt.Sprintf("HSET %s %s", keyH, idx) + if _, err := a.Redis.HSet(reqCtx, keyH, idx, b).Result(); err != nil { + s = fmt.Sprintf("%s: %s", s, err) + log.Error(s) + return JSONProblem(c, http.StatusInternalServerError, "", s) + } + + if err := a.pushNotPending(reqCtx, keyPendingH, keyQ, idx); err != nil { + log.Error(fmt.Sprintf("can't push %s %s: %s", keyQ, idx, err)) + return JSONProblemf(c, http.StatusInternalServerError, "redis operation", "can't push %s %s: %s", keyQ, idx, err) + } + + log.Debug("action begin accepted") + return c.JSON(http.StatusAccepted, api.ActionRequestAccepted{Uuid: uuid}) +} diff --git a/apihandlers/put_feed_action.go b/apihandlers/put_feed_action.go new file mode 100644 index 0000000..a305a39 --- /dev/null +++ b/apihandlers/put_feed_action.go @@ -0,0 +1,89 @@ +package apihandlers + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/labstack/echo/v4" + + "github.com/opensvc/oc3/api" + "github.com/opensvc/oc3/cachekeys" +) + +// { +// "uuid": "ea9a8373-3dda-4fe7-8c4b-08f5290c6c8b", +// "path": "foo", +// "action": "thaw", +// "begin": "2026-01-16 15:05:00", +// "end": "2026-01-16 15:06:00", +// "cron": false, +// "session_uuid": "7f2df7b2-8a4a-4bc1-9a8b-03acffaacd45", +// "actionlogfile": "/var/tmp/opensvc/foo.freezeupdfl98l", +// "status": "ok" +// } + +// PutFeedActionEnd handles PUT /feed/action +func (a *Api) PutFeedActionEnd(c echo.Context) error { + keyH := cachekeys.FeedActionH + keyQ := cachekeys.FeedActionQ + keyPendingH := cachekeys.FeedActionPendingH + + log := getLog(c) + + nodeID := nodeIDFromContext(c) + if nodeID == "" { + log.Debug("node auth problem") + return JSONNodeAuthProblem(c) + } + + ClusterID := clusterIDFromContext(c) + if ClusterID == "" { + return JSONProblemf(c, http.StatusConflict, "Refused", "authenticated node doesn't define cluster id") + } + + var payload api.PutFeedActionEndJSONRequestBody + if err := c.Bind(&payload); err != nil { + return JSONProblem(c, http.StatusBadRequest, "Failed to json decode request body", err.Error()) + } + + b, err := json.Marshal(payload) + if err != nil { + return JSONProblem(c, http.StatusInternalServerError, "Failed to re-encode config", err.Error()) + } + + reqCtx := c.Request().Context() + + idx := fmt.Sprintf("%s@%s@%s:%s", payload.Path, nodeID, ClusterID, payload.Uuid) + + // if action is pending, update the stored begin action with end info to avoid begin and end processing + if n, err := a.Redis.HExists(reqCtx, keyPendingH, idx).Result(); err == nil && n { + if currentBytes, err := a.Redis.HGet(reqCtx, keyH, idx).Bytes(); err == nil { + var currentAction api.Action + if err := json.Unmarshal(currentBytes, ¤tAction); err == nil { + currentAction.End = payload.End + currentAction.Status = payload.Status + currentAction.Actionlogfile = payload.Actionlogfile + + if updatedBytes, err := json.Marshal(currentAction); err == nil { + b = updatedBytes + } + } + } + } + + s := fmt.Sprintf("HSET %s %s", keyH, idx) + if _, err := a.Redis.HSet(reqCtx, keyH, idx, b).Result(); err != nil { + s = fmt.Sprintf("%s: %s", s, err) + log.Error(s) + return JSONProblem(c, http.StatusInternalServerError, "", s) + } + + if err := a.pushNotPending(reqCtx, keyPendingH, keyQ, idx); err != nil { + log.Error(fmt.Sprintf("can't push %s %s: %s", keyQ, idx, err)) + return JSONProblemf(c, http.StatusInternalServerError, "redis operation", "can't push %s %s: %s", keyQ, idx, err) + } + + log.Debug("action end accepted") + return c.NoContent(http.StatusAccepted) +} diff --git a/cachekeys/main.go b/cachekeys/main.go index 897f709..17b582e 100644 --- a/cachekeys/main.go +++ b/cachekeys/main.go @@ -34,4 +34,8 @@ const ( FeedInstanceStatusQ = "oc3:q:feed_instance_status" FeedInstanceStatusP = "oc3:p:feed_instance_status" FeedInstanceStatusPendingH = "oc3:h:feed_instance_status_pending" + + FeedActionH = "oc3:h:feed_action" + FeedActionQ = "oc3:q:feed_action" + FeedActionPendingH = "oc3:h:feed_action_pending" ) diff --git a/cdb/db_actions.go b/cdb/db_actions.go index 1986bdf..feca3bd 100644 --- a/cdb/db_actions.go +++ b/cdb/db_actions.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "time" "github.com/google/uuid" ) @@ -209,3 +210,162 @@ func (oDb *DB) GetUnfinishedActions(ctx context.Context) (lines []SvcAction, err return } + +func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, action string, begin time.Time, status_log string, sid string, cron bool, end time.Time, status string) (int64, error) { + query := "INSERT INTO svcactions (svc_id, node_id, action, begin, status_log, sid, cron" + placeholders := "?, ?, ?, ?, ?, ?, ?" + args := []any{svcID, nodeID, action, begin, status_log, sid, cron} + + if !end.IsZero() { + query += ", end" + placeholders += ", ?" + args = append(args, end) + } + if status != "" { + query += ", status" + placeholders += ", ?" + args = append(args, status) + } + query += fmt.Sprintf(") VALUES (%s)", placeholders) + + result, err := oDb.DB.ExecContext(ctx, query, args...) + if err != nil { + return 0, err + } + + id, err := result.LastInsertId() + if err != nil { + return 0, err + } + + if rowsAffected, err := result.RowsAffected(); err != nil { + return id, err + } else if rowsAffected > 0 { + oDb.SetChange("svcactions") + } + + return id, nil +} + +func (oDb *DB) UpdateSvcAction(ctx context.Context, svcActionID int64, end time.Time, status string) error { + const query = `UPDATE svcactions SET end = ?, status = ?, time = TIMESTAMPDIFF(SECOND, begin, ?) WHERE id = ?` + result, err := oDb.DB.ExecContext(ctx, query, end, status, end, svcActionID) + if err != nil { + return err + } + if rowsAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowsAffected > 0 { + oDb.SetChange("svcactions") + } + return nil +} + +// FindActionID finds the action ID for the given parameters. +func (oDb *DB) FindActionID(ctx context.Context, nodeID string, svcID string, begin time.Time, action string) (int64, error) { + // todo : check if there is only one result + const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND begin = ? AND action = ? AND pid IS NULL" + var id int64 + if err := oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, begin, action).Scan(&id); err != nil { + return 0, err + } + return id, nil +} + +// UpdateActionErrors updates the action errors in the database. +func (oDb *DB) UpdateActionErrors(ctx context.Context, svcID string, nodeID string) error { + const queryCount = `SELECT count(id) FROM svcactions a + WHERE + a.svc_id = ? AND + a.node_id = ? AND + a.status = "err" AND + ((a.ack <> 1) OR (a.ack IS NULL)) AND + a.begin > DATE_SUB(NOW(), INTERVAL 2 DAY)` + + var errCount int + if err := oDb.DB.QueryRowContext(ctx, queryCount, svcID, nodeID).Scan(&errCount); err != nil { + return fmt.Errorf("UpdateActionErrors: count failed: %w", err) + } + + if errCount == 0 { + const queryDelete = `DELETE FROM b_action_errors + WHERE + svc_id = ? AND + node_id = ?` + if _, err := oDb.DB.ExecContext(ctx, queryDelete, svcID, nodeID); err != nil { + return fmt.Errorf("UpdateActionErrors: delete failed: %w", err) + } + } else { + const queryInsert = `INSERT INTO b_action_errors + SET + svc_id= ?, + node_id= ?, + err= ? + ON DUPLICATE KEY UPDATE + err= ?` + if _, err := oDb.DB.ExecContext(ctx, queryInsert, svcID, nodeID, errCount, errCount); err != nil { + return fmt.Errorf("UpdateActionErrors: insert/update failed: %w", err) + } + } + return nil +} + +// UpdateDashActionErrors updates the dashboard with action errors. +func (oDb *DB) UpdateDashActionErrors(ctx context.Context, svcID string, nodeID string) error { + const query = `SELECT e.err, s.svc_env FROM b_action_errors e + JOIN services s ON e.svc_id=s.svc_id + WHERE + e.svc_id = ? AND + e.node_id = ?` + + var errCount int + var svcEnv sql.NullString + var err error + + err = oDb.DB.QueryRowContext(ctx, query, svcID, nodeID).Scan(&errCount, &svcEnv) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("UpdateDashActionErrors: select failed: %w", err) + } + + if err == nil { + sev := 3 + if svcEnv.Valid && svcEnv.String == "PRD" { + sev = 4 + } + const queryInsert = `INSERT INTO dashboard + SET + dash_type="action errors", + svc_id = ?, + node_id = ?, + dash_severity = ?, + dash_fmt = "%(err)s action errors", + dash_dict = ?, + dash_created = NOW(), + dash_updated = NOW(), + dash_env = ? + ON DUPLICATE KEY UPDATE + dash_severity = ?, + dash_fmt = "%(err)s action errors", + dash_dict = ?, + dash_updated = NOW(), + dash_env = ?` + + dashDict := fmt.Sprintf(`{"err": "%d"}`, errCount) + + if _, err := oDb.DB.ExecContext(ctx, queryInsert, svcID, nodeID, sev, dashDict, svcEnv, sev, dashDict, svcEnv); err != nil { + return fmt.Errorf("UpdateDashActionErrors: insert/update failed: %w", err) + } + // TODO: WebSocket notification + } else { + // TODO: WebSocket notification + const queryDelete = `DELETE FROM dashboard + WHERE + dash_type="action errors" AND + svc_id = ? AND + node_id = ?` + if _, err := oDb.DB.ExecContext(ctx, queryDelete, svcID, nodeID); err != nil { + return fmt.Errorf("UpdateDashActionErrors: delete failed: %w", err) + } + } + return nil +} diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index 748399d..980723a 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -2,8 +2,11 @@ package cdb import ( "context" + "crypto/md5" "database/sql" + "encoding/json" "fmt" + "strings" "time" ) @@ -782,3 +785,214 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { } return nil } + +func (oDb *DB) DashboardUpdatePkgDiffForNode(ctx context.Context, nodeID string) error { + request := `SET @now = NOW()` + _, err := oDb.DB.ExecContext(ctx, request) + if err != nil { + return err + } + + processSvcID := func(svcID, monSvctype, monVmtype string) error { + var query string + if monVmtype != "" { + // encap peers + query = ` + SELECT DISTINCT nodes.node_id, nodes.nodename + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? + AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE) + AND svcmon.mon_vmtype != "" + ORDER BY nodes.nodename + ` + } else { + // non-encap peers + query = ` + SELECT DISTINCT nodes.node_id, nodes.nodename + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? + AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 20 MINUTE) + AND svcmon.mon_vmtype = "" + ORDER BY nodes.nodename + ` + } + + rows, err := oDb.DB.QueryContext(ctx, query, svcID) + if err != nil { + return fmt.Errorf("failed to query nodes: %v", err) + } + defer rows.Close() + + var nodeIDs []string + var nodenames []string + for rows.Next() { + var nodeID string + var nodename string + if err := rows.Scan(&nodeID, &nodename); err != nil { + return fmt.Errorf("failed to scan node row: %v", err) + } + nodeIDs = append(nodeIDs, nodeID) + nodenames = append(nodenames, nodename) + } + + if len(nodeIDs) < 2 { + return nil + } + + // Count pkg diffs + var pkgDiffCount int + placeholders := make([]string, len(nodeIDs)) + args := make([]any, len(nodeIDs)) + for i, id := range nodeIDs { + placeholders[i] = "?" + args[i] = id + } + query = fmt.Sprintf(` + SELECT COUNT(pkg_name) + FROM ( + SELECT + pkg_name, + pkg_version, + pkg_arch, + pkg_type, + COUNT(DISTINCT node_id) AS c + FROM packages + WHERE + node_id IN (%s) + AND pkg_name NOT LIKE "gpg-pubkey%%" + GROUP BY + pkg_name, + pkg_version, + pkg_arch, + pkg_type + ) AS t + WHERE t.c != ? + `, strings.Join(placeholders, ",")) + args = append(args, len(nodeIDs)) + + err = oDb.DB.QueryRowContext(ctx, query, args...).Scan(&pkgDiffCount) + if err != nil { + return fmt.Errorf("failed to count package differences: %v", err) + } + + if pkgDiffCount == 0 { + return nil + } + + sev := 0 + if monSvctype == "PRD" { + sev = 1 + } + + // truncate too long node names list + skip := 0 + trail := "" + nodesStr := strings.Join(nodenames, ",") + for len(nodesStr)+len(trail) > 50 { + skip++ + nodenames = nodenames[:len(nodenames)-1] + nodesStr = strings.Join(nodenames, ",") + trail = fmt.Sprintf(", ... (+%d)", skip) + } + nodesStr += trail + + // Format dash_dict JSON content + dashDict := map[string]any{ + "n": pkgDiffCount, + "nodes": nodesStr, + } + dashDictJSON, err := json.Marshal(dashDict) + if err != nil { + return fmt.Errorf("failed to marshal dash_dict: %v", err) + } + + dashDictMD5 := fmt.Sprintf("%x", md5.Sum(dashDictJSON)) + + query = ` + INSERT INTO dashboard + SET + dash_type = "package differences in cluster", + svc_id = ?, + node_id = "", + dash_severity = ?, + dash_fmt = "%(n)s package differences in cluster %(nodes)s", + dash_dict = ?, + dash_dict_md5 = ?, + dash_created = @now, + dash_updated = @now, + dash_env = ? + ON DUPLICATE KEY UPDATE + dash_severity = ?, + dash_fmt = "%(n)s package differences in cluster %(nodes)s", + dash_dict = ?, + dash_dict_md5 = ?, + dash_updated = @now, + dash_env = ? + ` + + _, err = oDb.DB.ExecContext(ctx, query, + svcID, sev, dashDictJSON, dashDictMD5, monSvctype, + sev, dashDictJSON, dashDictMD5, monSvctype, + ) + if err != nil { + return fmt.Errorf("failed to insert/update dashboard: %v", err) + } + + return nil + } + + // Get the list of svc_id for instances having recently updated status + rows, err := oDb.DB.QueryContext(ctx, ` + SELECT svcmon.svc_id, svcmon.mon_svctype, svcmon.mon_vmtype + FROM svcmon + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.node_id = ? AND svcmon.mon_updated > DATE_SUB(NOW(), INTERVAL 19 MINUTE) + `, nodeID) + if err != nil { + return fmt.Errorf("failed to query svcmon: %v", err) + } + defer rows.Close() + + var svcIDs []any + todo := make(map[string]func() error) + for rows.Next() { + var svcID string + var monSvctype, monVmtype sql.NullString + if err := rows.Scan(&svcID, &monSvctype, &monVmtype); err != nil { + return fmt.Errorf("failed to scan svcmon row: %v", err) + } + + // Remember which svc_id needs non-updated alert clean up + svcIDs = append(svcIDs, svcID) + + // Defer after rows.Close() to avoid busy db conn errors + todo[svcID] = func() error { + return processSvcID(svcID, monSvctype.String, monVmtype.String) + } + } + rows.Close() + for svcID, fn := range todo { + if err := fn(); err != nil { + return fmt.Errorf("failed to process svc_id %s: %v", svcID, err) + } + } + + // Clean up non updated alerts + if len(svcIDs) > 0 { + query := fmt.Sprintf(` + DELETE FROM dashboard + WHERE svc_id IN (%s) + AND dash_type = "package differences in cluster" + AND dash_updated < @now + `, Placeholders(len(svcIDs))) + + _, err := oDb.DB.ExecContext(ctx, query, svcIDs...) + if err != nil { + return fmt.Errorf("failed to delete old dashboard entries: %v", err) + } + } + + return nil +} diff --git a/cdb/db_nodes.go b/cdb/db_nodes.go index 6c3f85c..26d216f 100644 --- a/cdb/db_nodes.go +++ b/cdb/db_nodes.go @@ -28,6 +28,7 @@ type ( EnclosureSlot string Enclosure string Hv string + Tz string } ) @@ -43,19 +44,19 @@ func (oDb *DB) NodeByNodeID(ctx context.Context, nodeID string) (*DBNode, error) var ( query = `SELECT nodename, cluster_id, node_env, app, hv, node_frozen, loc_country, loc_city, loc_addr, loc_building, loc_floor, loc_room, - loc_rack, loc_zip, enclosure, enclosureslot + loc_rack, loc_zip, enclosure, enclosureslot, tz FROM nodes WHERE node_id = ? LIMIT 1` nodename, clusterID, nodeEnv, app, hv, frozen, locCountry sql.NullString locCity, locAddr, locBuilding, locFloor, locRoom, locRack sql.NullString - locZip, enclosure, enclosureSlot sql.NullString + locZip, enclosure, enclosureSlot, tz sql.NullString ) err := oDb.DB. QueryRowContext(ctx, query, nodeID). Scan( &nodename, &clusterID, &nodeEnv, &app, &hv, &frozen, &locCountry, &locCity, &locAddr, &locBuilding, &locFloor, &locRoom, - &locRack, &locZip, &enclosure, &enclosureSlot) + &locRack, &locZip, &enclosure, &enclosureSlot, &tz) switch { case errors.Is(err, sql.ErrNoRows): return nil, nil @@ -80,6 +81,7 @@ func (oDb *DB) NodeByNodeID(ctx context.Context, nodeID string) (*DBNode, error) Enclosure: enclosure.String, EnclosureSlot: enclosureSlot.String, Hv: hv.String, + Tz: tz.String, } return &node, nil } @@ -437,3 +439,64 @@ func (oDb *DB) UpdateVirtualAssets(ctx context.Context) error { } return nil } + +func (oDb *DB) UpdateVirtualAsset(ctx context.Context, svcID, nodeID string) error { + request := ` + UPDATE nodes n + JOIN ( + SELECT + svcmon.mon_vmname AS vmname, + services.svc_app AS svc_app, + nodes.app AS node_app, + nodes.loc_addr, + nodes.loc_city, + nodes.loc_zip, + nodes.loc_room, + nodes.loc_building, + nodes.loc_floor, + nodes.loc_rack, + nodes.power_cabinet1, + nodes.power_cabinet2, + nodes.power_supply_nb, + nodes.power_protect, + nodes.power_protect_breaker, + nodes.power_breaker1, + nodes.power_breaker2, + nodes.loc_country, + nodes.enclosure + FROM svcmon + JOIN services ON svcmon.svc_id = services.svc_id + JOIN nodes ON svcmon.node_id = nodes.node_id + WHERE svcmon.svc_id = ? AND svcmon.node_id = ? + ) AS source + SET + n.loc_addr = COALESCE(NULLIF(source.loc_addr, ''), n.loc_addr), + n.loc_city = COALESCE(NULLIF(source.loc_city, ''), n.loc_city), + n.loc_zip = COALESCE(NULLIF(source.loc_zip, ''), n.loc_zip), + n.loc_room = COALESCE(NULLIF(source.loc_room, ''), n.loc_room), + n.loc_building = COALESCE(NULLIF(source.loc_building, ''), n.loc_building), + n.loc_floor = COALESCE(NULLIF(source.loc_floor, ''), n.loc_floor), + n.loc_rack = COALESCE(NULLIF(source.loc_rack, ''), n.loc_rack), + n.power_cabinet1 = COALESCE(NULLIF(source.power_cabinet1, ''), n.power_cabinet1), + n.power_cabinet2 = COALESCE(NULLIF(source.power_cabinet2, ''), n.power_cabinet2), + n.power_supply_nb = COALESCE(NULLIF(source.power_supply_nb, ''), n.power_supply_nb), + n.power_protect = COALESCE(NULLIF(source.power_protect, ''), n.power_protect), + n.power_protect_breaker = COALESCE(NULLIF(source.power_protect_breaker, ''), n.power_protect_breaker), + n.power_breaker1 = COALESCE(NULLIF(source.power_breaker1, ''), n.power_breaker1), + n.power_breaker2 = COALESCE(NULLIF(source.power_breaker2, ''), n.power_breaker2), + n.loc_country = COALESCE(NULLIF(source.loc_country, ''), n.loc_country), + n.enclosure = COALESCE(NULLIF(source.enclosure, ''), n.enclosure) + WHERE + n.nodename = source.vmname AND + n.app IN (source.svc_app, source.node_app)` + result, err := oDb.DB.ExecContext(ctx, request, svcID, nodeID) + if err != nil { + return err + } + if rowAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowAffected > 0 { + oDb.SetChange("nodes") + } + return nil +} diff --git a/cdb/util.go b/cdb/util.go index 9038bcf..9de5ba7 100644 --- a/cdb/util.go +++ b/cdb/util.go @@ -3,6 +3,8 @@ package cdb import ( "fmt" "log/slog" + "strconv" + "strings" "time" ) @@ -13,3 +15,35 @@ func logDuration(s string, begin time.Time) { func logDurationInfo(s string, begin time.Time) { slog.Info(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin))) } + +// Todo : to move elsewhere... +// Tz can be either a timezone name (ex: "Europe/Paris") or a UTC offset (ex: "+01:00") +func ParseTimeWithTimezone(dateStr, tzStr string) (time.Time, error) { + const timeFormat = "2006-01-02 15:04:05" + + // Named Tz + if loc, err := time.LoadLocation(tzStr); err == nil { + return time.ParseInLocation(timeFormat, dateStr, loc) + } + + // Offset Tz + tzStr = strings.TrimSpace(tzStr) + if tzStr != "" && (tzStr[0] == '+' || tzStr[0] == '-') { + parts := strings.Split(tzStr[1:], ":") + if len(parts) >= 2 { + hours, errH := strconv.Atoi(parts[0]) + minutes, errM := strconv.Atoi(parts[1]) + if errH == nil && errM == nil { + offset := hours*3600 + minutes*60 + if tzStr[0] == '-' { + offset = -offset + } + loc := time.FixedZone(tzStr, offset) + return time.ParseInLocation(timeFormat, dateStr, loc) + } + } + } + + // Fallback + return time.Parse(timeFormat, dateStr) +} diff --git a/scheduler/main.go b/scheduler/main.go index 7081f81..ed88667 100644 --- a/scheduler/main.go +++ b/scheduler/main.go @@ -44,6 +44,10 @@ func (t *Scheduler) Debugf(format string, args ...any) { func (t *Scheduler) toggleTasks(ctx context.Context, states map[string]State) { for _, task := range Tasks { + if task.period == 0 { + //task.Debugf("skip: no period") + continue + } name := task.Name() storedState, _ := states[name] cachedState, hasCachedState := t.states[name] diff --git a/scheduler/task_alerts.go b/scheduler/task_alerts.go index 56daf20..b6ddf06 100644 --- a/scheduler/task_alerts.go +++ b/scheduler/task_alerts.go @@ -16,7 +16,7 @@ var TaskAlert1M = Task{ TaskAlertInstancesNotUpdated, }, period: time.Minute, - timeout: 15 * time.Minute, + timeout: 15 * time.Second, } var TaskAlert1H = Task{ diff --git a/scheduler/task_metrics.go b/scheduler/task_metrics.go index 9c2f09f..d608fca 100644 --- a/scheduler/task_metrics.go +++ b/scheduler/task_metrics.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "os" "path/filepath" "strings" "time" @@ -25,21 +24,11 @@ var TaskMetrics = Task{ } func MakeWSPFilename(format string, args ...any) (string, error) { - var dir string - candidates := viper.GetStringSlice("scheduler.task.metrics.directories") - if len(candidates) == 0 { - return "", fmt.Errorf("scheduler.task.metrics.directories is not set") + directory := viper.GetString("scheduler.directories.uploads") + if directory == "" { + return "", fmt.Errorf("define scheduler.directories.uploads") } - for _, d := range candidates { - if _, err := os.Stat(d); err == nil { - dir = d - break - } - } - if dir == "" { - return "", fmt.Errorf("scheduler.task.metrics.directories has no existing entry") - } - return filepath.Join(dir, fmt.Sprintf(format+".wsp", args...)), nil + return filepath.Join(directory, "stats", fmt.Sprintf(format+".wsp", args...)), nil } func taskMetrics(ctx context.Context, task *Task) error { diff --git a/scheduler/task_nodes.go b/scheduler/task_nodes.go index 28caa2c..c2c88f4 100644 --- a/scheduler/task_nodes.go +++ b/scheduler/task_nodes.go @@ -8,7 +8,7 @@ import ( var TaskUpdateVirtualAssets = Task{ name: "update_virtual_assets", fn: taskUpdateVirtualAssets, - timeout: time.Minute, + timeout: 10 * time.Second, } func taskUpdateVirtualAssets(ctx context.Context, task *Task) error { diff --git a/scheduler/task_scrub.go b/scheduler/task_scrub.go index da039d7..ad5a707 100644 --- a/scheduler/task_scrub.go +++ b/scheduler/task_scrub.go @@ -93,6 +93,12 @@ var TaskScrubSvcdisks = Task{ timeout: time.Minute, } +var TaskScrubStatic = Task{ + name: "scrub_static", + fn: taskScrubStatic, + timeout: time.Minute, +} + var TaskScrubTempviz = Task{ name: "scrub_tempviz", fn: taskScrubTempviz, @@ -157,6 +163,7 @@ var TaskScrub1D = Task{ TaskScrubPatches, TaskScrubPdf, TaskScrubResmon, + TaskScrubStatic, TaskScrubStorArray, TaskScrubSvcdisks, TaskUpdateStorArrayDGQuota, @@ -578,21 +585,12 @@ func taskUpdateStorArrayDGQuota(ctx context.Context, task *Task) error { return odb.Commit() } -func taskScrubTempviz(ctx context.Context, task *Task) error { - threshold := time.Now().Add(-1 * time.Hour) - directories := viper.GetStringSlice("scheduler.task.scrub_tempviz.directories") - if len(directories) == 0 { - slog.Warn("skip: define scheduler.task.scrub_tempviz.directories") - return nil - } +func scrubFiles(pattern string, threshold time.Time) error { var matches []string - for _, directory := range directories { - pattern := filepath.Join(directory, "tempviz*") - if m, err := filepath.Glob(pattern); err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } else { - matches = append(matches, m...) - } + if m, err := filepath.Glob(pattern); err != nil { + return fmt.Errorf("failed to glob files: %w", err) + } else { + matches = append(matches, m...) } for _, fpath := range matches { fileInfo, err := os.Stat(fpath) @@ -610,36 +608,53 @@ func taskScrubTempviz(ctx context.Context, task *Task) error { return nil } -func taskScrubPdf(ctx context.Context, task *Task) error { - threshold := time.Now().Add(-24 * time.Hour) - directories := viper.GetStringSlice("scheduler.task.scrub_pdf.directories") - if len(directories) == 0 { - slog.Warn("skip: define scheduler.task.scrub_pdf.directories") +func taskScrubStatic(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-1 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") return nil } - var matches []string - for _, directory := range directories { - pattern := filepath.Join(directory, "*-*-*-*-*.pdf") - if m, err := filepath.Glob(pattern); err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } else { - matches = append(matches, m...) - } + if err := scrubFiles(filepath.Join(directory, "tempviz*.png"), threshold); err != nil { + return err } - for _, fpath := range matches { - fileInfo, err := os.Stat(fpath) - if err != nil { - return err - } - mtime := fileInfo.ModTime() - if mtime.Before(threshold) { - slog.Info(fmt.Sprintf("rm %s mtime %s", fpath, mtime)) - if err := os.Remove(fpath); err != nil { - return fmt.Errorf("failed to rm %s: %w", fpath, err) - } - } + if err := scrubFiles(filepath.Join(directory, "tempviz*.dot"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stats_*_[0-9]*.png"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stat_*_[0-9]*.png"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "stats_*_[0-9]*.svg"), threshold); err != nil { + return err + } + if err := scrubFiles(filepath.Join(directory, "*-*-*-*.pdf"), threshold); err != nil { + return err } return nil + +} + +func taskScrubTempviz(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-1 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") + return nil + } + return scrubFiles(filepath.Join(directory, "tempviz*"), threshold) +} + +func taskScrubPdf(ctx context.Context, task *Task) error { + threshold := time.Now().Add(-24 * time.Hour) + directory := viper.GetString("scheduler.directories.static") + if directory == "" { + slog.Warn("skip: define scheduler.directories.static") + return nil + } + return scrubFiles(filepath.Join(directory, "*-*-*-*-*.pdf"), threshold) } func taskScrubUnfinishedActions(ctx context.Context, task *Task) error { diff --git a/worker/job.go b/worker/job.go index 39c9054..8b1ff74 100644 --- a/worker/job.go +++ b/worker/job.go @@ -161,9 +161,10 @@ func runOps(ops ...operation) error { With(prometheus.Labels{"desc": op.desc, "status": operationStatusFailed}). Observe(duration.Seconds()) if op.blocking { - continue + return err } - return err + slog.Warn("%s: non blocking error: %s", op.desc, err) + continue } operationDuration. With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}). diff --git a/worker/job_feed_action.go b/worker/job_feed_action.go new file mode 100644 index 0000000..083c3f4 --- /dev/null +++ b/worker/job_feed_action.go @@ -0,0 +1,167 @@ +package worker + +import ( + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/opensvc/oc3/api" + "github.com/opensvc/oc3/cachekeys" + "github.com/opensvc/oc3/cdb" +) + +type jobFeedAction struct { + *BaseJob + + nodeID string + clusterID string + node *cdb.DBNode + + // idX is the id of the posted action begin with the pattern: @@: + idX string + + objectName string + objectID string + + // data is the posted action begin payload + data *api.PostFeedActionJSONRequestBody + + rawData []byte +} + +func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedAction { + idX := fmt.Sprintf("%s@%s@%s:%s", objectName, nodeID, clusterID, uuid) + + return &jobFeedAction{ + BaseJob: &BaseJob{ + name: "action", + detail: "ID: " + idX, + cachePendingH: cachekeys.FeedActionPendingH, + cachePendingIDX: idX, + }, + idX: idX, + nodeID: nodeID, + clusterID: clusterID, + objectName: objectName, + } +} + +func (d *jobFeedAction) Operations() []operation { + return []operation{ + {desc: "actionBegin/dropPending", do: d.dropPending}, + {desc: "actionBegin/getData", do: d.getData}, + {desc: "actionBegin/findNodeFromDb", do: d.findNodeFromDb}, + {desc: "actionBegin/findObjectFromDb", do: d.findObjectFromDb}, + {desc: "actionBegin/processAction", do: d.updateDB}, + {desc: "actionBegin/pushFromTableChanges", do: d.pushFromTableChanges}, + } +} + +func (d *jobFeedAction) getData() error { + var ( + data api.PostFeedActionJSONRequestBody + ) + if b, err := d.redis.HGet(d.ctx, cachekeys.FeedActionH, d.idX).Bytes(); err != nil { + return fmt.Errorf("getData: HGET %s %s: %w", cachekeys.FeedActionH, d.idX, err) + } else if err = json.Unmarshal(b, &data); err != nil { + return fmt.Errorf("getData: unexpected data from %s %s: %w", cachekeys.FeedActionH, d.idX, err) + } else { + d.rawData = b + d.data = &data + } + + slog.Info(fmt.Sprintf("got action begin data for node %s:%#v", d.nodeID, d.data)) + return nil +} + +func (d *jobFeedAction) findNodeFromDb() error { + if n, err := d.oDb.NodeByNodeID(d.ctx, d.nodeID); err != nil { + return fmt.Errorf("findNodeFromDb: node %s: %w", d.nodeID, err) + } else { + d.node = n + } + slog.Info(fmt.Sprintf("jobFeedActionBegin found node %s for id %s", d.node.Nodename, d.nodeID)) + return nil +} + +func (d *jobFeedAction) findObjectFromDb() error { + if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(d.ctx, d.objectName, d.clusterID); err != nil { + return fmt.Errorf("find or create object ID failed for %s: %w", d.objectName, err) + } else if isNew { + slog.Info(fmt.Sprintf("jobFeedActionBegin has created new object id %s@%s %s", d.objectName, d.clusterID, objId)) + } else { + d.objectID = objId + slog.Info(fmt.Sprintf("jobFeedActionBegin found object id %s@%s %s", d.objectName, d.clusterID, objId)) + } + + return nil +} + +func (d *jobFeedAction) updateDB() error { + if d.data == nil || d.data.Path == "" { + return fmt.Errorf("invalid action data: missing path") + } + + objectUUID, err := uuid.Parse(d.objectID) + if err != nil { + return fmt.Errorf("invalid object ID UUID: %w", err) + } + nodeUUID, err := uuid.Parse(d.nodeID) + if err != nil { + return fmt.Errorf("invalid node ID UUID: %w", err) + } + beginTime, err := cdb.ParseTimeWithTimezone(d.data.Begin, d.node.Tz) + if err != nil { + return fmt.Errorf("invalid begin time format: %w", err) + } + + status_log := "" + if len(d.data.Argv) > 0 { + status_log = fmt.Sprintf("%s", d.data.Argv[0]) + for i := 1; i < len(d.data.Argv); i++ { + status_log += " " + d.data.Argv[i] + } + } + + if d.data.End != "" { + // field End is present, process as action end + endTime, err := cdb.ParseTimeWithTimezone(d.data.End, d.node.Tz) + if err != nil { + return fmt.Errorf("invalid end time format: %w", err) + } + + actionId, err := d.oDb.FindActionID(d.ctx, d.nodeID, d.objectID, beginTime, d.data.Action) + if err != nil { + return fmt.Errorf("find action ID failed: %w", err) + } + + if actionId == 0 { + // begin not processed yet, insert full record + if _, err := d.oDb.InsertSvcAction(d.ctx, objectUUID, nodeUUID, d.data.Action, beginTime, status_log, d.data.SessionUuid, d.data.Cron, endTime, d.data.Status); err != nil { + return fmt.Errorf("insert svc action failed: %w", err) + } + } else { + // begin already processed, update record with end info + if err := d.oDb.UpdateSvcAction(d.ctx, actionId, endTime, d.data.Status); err != nil { + return fmt.Errorf("end svc action failed: %w", err) + } + } + + if d.data.Status == "err" { + if err := d.oDb.UpdateActionErrors(d.ctx, d.objectID, d.nodeID); err != nil { + return fmt.Errorf("update action errors failed: %w", err) + } + if err := d.oDb.UpdateDashActionErrors(d.ctx, d.objectID, d.nodeID); err != nil { + return fmt.Errorf("update dash action errors failed: %w", err) + } + } + + } else { + // field End is not present, process as action begin + d.oDb.InsertSvcAction(d.ctx, objectUUID, nodeUUID, d.data.Action, beginTime, status_log, d.data.SessionUuid, d.data.Cron, time.Time{}, "") + } + + return nil +} diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index 94f95fb..0c06413 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -100,7 +100,9 @@ func (d *jobFeedSystem) pkg() error { } else { defer rows.Close() } - + if err := d.oDb.DashboardUpdatePkgDiffForNode(d.ctx, nodeID); err != nil { + return err + } return nil } diff --git a/worker/worker.go b/worker/worker.go index 8b38349..aaa8a9a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -164,6 +164,16 @@ func (w *Worker) runJob(unqueuedJob []string) error { return err } j = newInstanceStatus(objectName, nodeID, clusterID) + + case cachekeys.FeedActionQ: + objectName, nodeID, ClusterID, uuid, err := w.jobToInstanceClusterIdAndUuid(unqueuedJob[1]) + if err != nil { + err := fmt.Errorf("invalid feed begin action index: %s", unqueuedJob[1]) + slog.Warn(err.Error()) + return err + } + j = newAction(objectName, nodeID, ClusterID, uuid) + default: slog.Debug(fmt.Sprintf("ignore queue '%s'", unqueuedJob[0])) return nil @@ -210,3 +220,17 @@ func (w *Worker) jobToInstanceAndClusterID(jobName string) (path, nodeID, cluste } return } + +func (w *Worker) jobToInstanceClusterIdAndUuid(jobName string) (path, nodeID, clusterID, uuid string, err error) { + l := strings.Split(jobName, ":") + if len(l) != 2 { + err = fmt.Errorf("unexpected job name: %s", jobName) + return + } + if path, nodeID, clusterID, err = w.jobToInstanceAndClusterID(l[0]); err != nil { + err = fmt.Errorf("unexpected job name: %s", jobName) + return + } + uuid = l[1] + return +}