diff --git a/api/api.yaml b/api/api.yaml index e934272..842964f 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -5,7 +5,7 @@ servers: info: title: opensvc collector api - version: 1.0.8 + version: 1.0.9 paths: /feed/daemon/ping: @@ -197,6 +197,64 @@ paths: tags: - agent + /feed/action: + put: + description: | + End an action for a given object path + operationId: PutFeedActionEnd + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Action' + responses: + 202: + description: action end accepted + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 500: + $ref: '#/components/responses/500' + security: + - basicAuth: [ ] + - bearerAuth: [ ] + tags: + - agent + post: + description: | + Begin an action for a given object path + operationId: PostFeedAction + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Action' + responses: + 202: + description: action begin accepted + content: + application/json: + schema: + $ref: '#/components/schemas/ActionRequestAccepted' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 500: + $ref: '#/components/responses/500' + security: + - basicAuth: [ ] + - bearerAuth: [ ] + tags: + - agent + /version: get: operationId: GetVersion @@ -292,6 +350,55 @@ components: type: string description: the opensvc client data version + ActionRequestAccepted: + type: object + required: + - uuid + properties: + uuid: + type: string + + Action: + type: object + required: + - path + - action + - version + - begin + - cron + - session_uuid + - argv + - uuid + - end + - actionlogfile + - status + properties: + path: + type: string + action: + type: string + version: + type: string + description: the opensvc client data version + begin: + type: string + cron: + type: boolean + session_uuid: + type: string + argv: + type: array + items: + type: string + uuid: + type: string + end: + type: string + actionlogfile: + type: string + status: + type: string + NodeDisks: type: object properties: @@ -558,3 +665,5 @@ components: items: type: string description: object name + + \ No newline at end of file diff --git a/api/codegen_server_gen.go b/api/codegen_server_gen.go index ab80781..fbbc68a 100644 --- a/api/codegen_server_gen.go +++ b/api/codegen_server_gen.go @@ -24,6 +24,12 @@ type ServerInterface interface { // (GET /docs/openapi) GetSwagger(ctx echo.Context) error + // (POST /feed/action) + PostFeedAction(ctx echo.Context) error + + // (PUT /feed/action) + PutFeedActionEnd(ctx echo.Context) error + // (POST /feed/daemon/ping) PostFeedDaemonPing(ctx echo.Context) error @@ -63,6 +69,32 @@ func (w *ServerInterfaceWrapper) GetSwagger(ctx echo.Context) error { return err } +// PostFeedAction converts echo context to params. +func (w *ServerInterfaceWrapper) PostFeedAction(ctx echo.Context) error { + var err error + + ctx.Set(BasicAuthScopes, []string{}) + + ctx.Set(BearerAuthScopes, []string{}) + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.PostFeedAction(ctx) + return err +} + +// PutFeedActionEnd converts echo context to params. +func (w *ServerInterfaceWrapper) PutFeedActionEnd(ctx echo.Context) error { + var err error + + ctx.Set(BasicAuthScopes, []string{}) + + ctx.Set(BearerAuthScopes, []string{}) + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.PutFeedActionEnd(ctx) + return err +} + // PostFeedDaemonPing converts echo context to params. func (w *ServerInterfaceWrapper) PostFeedDaemonPing(ctx echo.Context) error { var err error @@ -201,6 +233,8 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL } router.GET(baseURL+"/docs/openapi", wrapper.GetSwagger) + router.POST(baseURL+"/feed/action", wrapper.PostFeedAction) + router.PUT(baseURL+"/feed/action", wrapper.PutFeedActionEnd) router.POST(baseURL+"/feed/daemon/ping", wrapper.PostFeedDaemonPing) router.POST(baseURL+"/feed/daemon/status", wrapper.PostFeedDaemonStatus) router.POST(baseURL+"/feed/instance/resource_info", wrapper.PostFeedInstanceResourceInfo) @@ -215,37 +249,40 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xZW3PbuBX+Kxi0D8kMV5Lt9DLqU3bTdN121m7ktA+2xwOBRxQ2JMAAh4qVDP97BwCv", - "IkTJ3ijTmfTJFnF4Lt+54vAL5SrLlQSJhs6/0JxplgGCdr+E/FcBervYSu5/0jn9aJ/QiEqWAZ1TY88i", - "avgaMmaJcJvb50ulUmCSlmUZUQ0mV9KAY/pqNrN/uJIIEu2/LM9TwRkKJae/GiXts5bh7zWs6Jz+btpq", - "OvWnZnqt1TKFzEuJwXAtcsuGzumPLCbv4GMBBmkZ0Vezs28h9b1kBa6VFp8h9mIvvoXYt0ovRRyDtDL/", - "8G0AvpQIWrKULEBvQJO/aq20lf8WIH7DIFPyWsjkNeeQI8RPUinXKgeNwkeMWv4KHB8+CVyrAh+4kiuR", - "2IO+QqkwSNSKeHJiA9QQXDMkGj4WQoMh11eLGzJV/GK6AoinnnJaMYyoQMjMkHGHIY3qADeohUysvdUD", - "pjXb0rJ94F8LQRc7dIhBhoUhKDIwyLLckE8iTckSiIaVBrOGmKyUJlLF0Ad24d78P7SHoB3Ds6yrllPr", - "jTAfhvDESaektWqJOPg4UzGkwZMK5pzhOniuIRHeUYMjIz6DPVgpnTGkcyokXpy3YAmJkIBLvMJAV7HO", - "yQZkrHSAv5PtPBjT+a01rK9sJb/i3TCqbY0sQvcDt0T0UhpkksM7MKrQHC7lSg3hFdXTJjj6xx9ga8aP", - "g4htWFrAYVvt6zVxyIR+7EVUB72+w1Q7BJ3ix/DcGxCocpWqZHtYYuUmB+WYJ3zJCIQ4w27fbl/cq9sG", - "tKmitZ97uAaicpBmwwlPBUgkljupX4iOM8Vp1IoJGfWLisFm7Ig9TdCMtTeX9UfUmYheuf9+aupjXyjL", - "8yBUXGVZVZUHZ7HOH1wlGjvsx/+B+hhRkJsg3SqFx4eMPYargz8VcuQUmU4AwwSZkgKVhvhBV9n+wFUh", - "91ArzddgUDMMW76/RrJPnf7UlMPlFoOtw3CVw9PQe2LShQLzWhnszz/DYGn8Gu6xPC0MQtWl9jbOLtWR", - "7bOu7EfI7vb5r9S8T1g3PKCteeO1o++ifWWRr5lMdtIvaL3SRFQl9hgQ9pdbDRuhCvNQ5DFDiB8Y9uLc", - "PvzBjoohKc9552TuqOp3jeABb1QXjGEdB2QiHWr3mqyLjMkfNLCYLVMg8JinTLpxl5gcuFgJTlARXAtD", - "FOeF1iA52PDGNdzJ3Euc3Mlg2WjioS/2Zg3k55ub63qy5DbvXty+e/vTn84vzu4jsgDuVPjjS5KABFvf", - "YrLceplKi0RIYvxNyc6gYe1ISLlO8USBKYQwMWulMdqFxhRZxvR2hzmxfCeEXCJZ/Hz1/p9v7uQvVzfE", - "+4ustMq6iqHar2ZE4NFeQe6kNSkvdK4MGEuUKs5S8dl75QVMkklECiNkYl9lHMUGSHVluZMSEoXC0f6F", - "GAASgPVi8upl0GW7wefDpnFkjVko9nLGP7AEAu1c83APcpmepk/MNL8qCc72yUhjGh+9DpREX5PblHUm", - "VZy94BAiZmswlIwdpI6aq2r6o0arjlF9qZ0DeGRZbkOfziazydnBMNhfcqyVwAstcLuw2npRS2YEf134", - "0cNZ4ZZY9mkra42YW4WXwDTomtr/eltHwt//c1PvwxwLd7rLoyyj5vJT5TRtSq5KU+CoNGG56PhwTs8m", - "s8mfXR/PQdrDOb2YzCYz6qcmZ8g0VtxMG4IvtBrbLK4uwy5jOqd/A1x8YkniNOtt586fuDw6eCG/+kdn", - "/RaKmUb81BK1O7NDtBedXdc4rSVykccS46a3YpkKTu/tM7+t8FuDaV7Pa8rgsMq+80uEZkjat8VxVaqP", - "d2As9PEKBn9U8fbr7euGgsp+bqAuoBw4/fwwinvWemVEz2evhmhlwrh630epXr9E1b6oAr46FYY0inYT", - "lc5veyl6e19GX3ppeHtf3rcOZokFcujftrk/w8MH3bqoO87pHVuJ+vqu3VksumycHZONs/+ZLP9qQVOP", - "9dPmWluX7HD0vHdDuL0QcA32v5oBqRkQy2AkjoJbs9PEU1DU8fHUtzxsqG2IdnQbLGFP5KZD2b1Apdub", - "zfnkrHVQVX/szWVyhH+aTO9+LrsNA96STLuf06xJp/TrUyrEbMSjFTLGLXiIKTgHY1ZFmm7JC7OVfK2V", - "VIV56fvA+WFO7SqeVVWGvGC7nL7fomOb4zSuP0YcV2fcDsi+Y6+lciWSwsfuSCTXC9QTVZd2P/vckrLP", - "plNXE4d/5xoU9IBFkXgiqyNXG9DbEbQXnt9psK6UfS7QtRkM2YHPj99vUvY/Zh6fmNWK8Nik7H1jOE2w", - "9ET85nb/rTKzswiorrN9hTRgoaW9L3eWlIMb77+bo9904x2Dt5a+9x78rPsqZzlbilS4vch96VHVm3rq", - "KHRK53Sq+AUt78v/BgAA//89OreI0iMAAA==", + "H4sIAAAAAAAC/+xaW28buRX+KwTbhwSYlWQ7bVH1ydkkXbfF2o2c9sEWBIpzNOJmhpyQHMVKoP9e8DYX", + "iRrJ3igI4D4lHh6ey3cuPDzUV0xFUQoOXCs8/opLIkkBGqT9i/F/VyDXkzWn7k88xp/MF5xgTgrAY6zM", + "WoIVXUJBDJFel+b7XIgcCMebzSbBElQpuALL9NVoZP6hgmvg2vyXlGXOKNFM8OFvSnDzrWH4RwkLPMZ/", + "GDaaDt2qGt5IMc+hcFJSUFSy0rDBY/yapOg9fKpAabxJ8KvR2feQ+oGTSi+FZF8gdWIvvofYd0LOWZoC", + "NzL/9H0AvuIaJCc5moBcgURvpRTSyH8HkL4hUAh+w3h2SSmUGtJHqVRKUYLUzEWMmP8GVM8+M70UlZ5R", + "wRcsMwtdhXKmNBIL5MiRCVCF9JJoJOFTxSQodHM9uUVDQS+GC4B06CiHnmGCmYZC7TJuMcRJCHClJeOZ", + "sdd/IFKSNd40H9y2GHSpRQcpTXSlkGYFKE2KUqHPLM/RHJCEhQS1hBQthERcpNAFdmJ3/h/aQ9D24bkJ", + "VcuqdUkdg22ASP19Rzm3lItswXKIU8hsZQtnMP6AgQmeQ8bi0qjsqFHX1wQDT6M7SqKX0QUFSjHBZ1XF", + "4jsdetGlvXtWIJVHqusRvQQkSuBqRRHNGXCNUqIJCht23G4PDBtVKR7fOTMC2rgRFMDy0GyZ5cH3CjuQ", + "tl1WGzrdCazEB4Q/QdqZ1o2PPXhs2WCpYlLeMPVxl2maRSHeg3whUsijKz679waChGxfcCv2xcb0QsiC", + "aDzGjOuL88ZZjGvIwNb7SkFbsdbKCngq5GF4rIfaynr5nnfNKNiaGIRicF5xpQmn8B6UqCSFK74Qu/Ay", + "/7VOy+7yR1ir/uV4ApC8gsO2mu2BOGbCdkWQx8SXtAhaxY/huTcgtChFLrL1YYneTRbKPk9M6lKyFeJE", + "t9vFZuNe3U5dYKxGjZiYUb+KFEzG9thTB01fV2Wz/ojjLcHX9n8/18fy1uFUlvGzQhSFbwZ21lJZzuwB", + "2LeoHndqAV9F6RY5PMwK8hCvDm61c9xtr2oiM9BxgkJwpoWEdCZ9ts+oqPgeaiHpEpSWRMPjDktJPrfa", + "orocztc62rEoKkp4HHqPTLpYYN4Ipbtt926w1H6Nt3Y0r5QG3xzt7dfaVEd2baGyHyG73V5+o57xhHXD", + "AdqY1187ui7aVxbpkvBsK/2i1guJmC+xx4Cwv9xKWDFRqVlVpkRDOiO6E+fm40/mhhKT8pQ9J3OHr98B", + "wQPe8Pfa3ToOmrB8V7tLtKwKwn+SQFIyzwHBQ5kTbm9ZSJVA2YJRpAXSS6aQoLSSEjgFE956Cfe8dBIH", + "9zxaNup46Iq9XQL65fb2JlxoqMm7F3fv3/38l/OLs2mCJmD7VPTnlygDDqa+pWi+djKFZBnjSLkLurn6", + "xLVDMeVaxVMz7W4425iopZA62YZGVUVB5HqLOTJ8BwhdaTT55frDv97c81+vb5HzF1pIUbQV02K/mgmC", + "B9OP33NjUlnJUihQhigXlOTsi/PKCxhkgwRVivHMbDXN/wqQvynfcw6Z0MzS/g0pABSB9WLw6mXUZdvB", + "58KmdmTALBZ7JaEfSQaR41zS+BlkMz3PH5lpbkIX7e2znoOpv/U6UBJdTW5S1prkOTvBMUTUWulYMraQ", + "OqqvCvRHtVYto7pSWwvwQIrShD4eDUaDs4NhsL/k2Gs3rSTT64nR1omaE8XoZeVaD2uFvdubr42spdal", + "mw0QCTJQu7/ehUj4x39vwxjWsrCr2zw2m6S+/PicxnXJFXkOVAuJSMlaPhzjs8Fo8Fd7jpfAzeIYXwxG", + "gxF2XZM1ZJgKqoY1wVfs2zaDq82wqxSP8d9BTz6TLLOadYbC54+cWR6cA13/szX1jcVMLX5oiJpR7SHa", + "i9aItZ/WENnII5my3Vs1zxnFU/PNDcma4VIplN4tsK/BlG/CkSO0FZygjK2Ah17JeMDWpy7Sodu4DFMT", + "6QYZr0W6/mbDYc98080CLSvY7Lj3/BtL3R7MRELAgzZ3INaESXiBOOTp0Q8TQaFy4PFdp2bcTTfJ105d", + "uJtupk3EkcygPTVnThUJr7c8fVpwVa3YemsHaz9EeEXdD8bI5+z8utq40fiwDLfDaMl57ybl9ZVs31NF", + "T81pXUJPExgRQccHST/oe96uNgk+H73aRatgynaXXZTCG0PiH0U88H6VKVQregr/NleJJ3j4oFsnob89", + "vWO9qG/v2q3Xs2ddFMIQYVgP0UKDGI+eD/bKj4REVIL5X2CAAgNkGPTEUXRGf5p4iop66nkSN9S03+aY", + "2XlpPJGbDmX3RAvZzFHOB2eNg3z9SYkmgyP8U2d6+zchd3HAG5Jh+zcjxqRT+vUxFWLU41GPjLLjZKQq", + "SkGpRZXna/RCrTldSsFFpV66c+D8MKfmvTm0HugF2eb0fIuOORyHaXj6PK7O2Imz2aOQG8dXLnZ7Ijk8", + "15youjSvQU8tKftsOnU1sfi3hi5RDxgUkSMyOlKxArnuQXvi+J0Ga6/sU4EOZhBNDvzG5vkmZfcXO8cn", + "pr8rHpuUnRfN0wRLR8TvPu6/V2a2xo5+eNZVSIKuJEekZK0nkZ352n/qpd81X+uDN0jfO3V70nSMkpLM", + "Wc7sFHa6cajKVeg6KpnjMR4KeoE3083/AgAA//8UylEOtyoAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/api/codegen_type_gen.go b/api/codegen_type_gen.go index 6453c5f..98a2636 100644 --- a/api/codegen_type_gen.go +++ b/api/codegen_type_gen.go @@ -12,6 +12,28 @@ const ( BearerAuthScopes = "bearerAuth.Scopes" ) +// Action defines model for Action. +type Action struct { + Action string `json:"action"` + Actionlogfile string `json:"actionlogfile"` + Argv []string `json:"argv"` + Begin string `json:"begin"` + Cron bool `json:"cron"` + End string `json:"end"` + Path string `json:"path"` + SessionUuid string `json:"session_uuid"` + Status string `json:"status"` + Uuid string `json:"uuid"` + + // Version the opensvc client data version + Version string `json:"version"` +} + +// ActionRequestAccepted defines model for ActionRequestAccepted. +type ActionRequestAccepted struct { + Uuid string `json:"uuid"` +} + // Disk defines model for Disk. type Disk struct { Dg string `json:"dg"` @@ -161,6 +183,12 @@ type PostFeedInstanceStatusParams struct { Sync *InQuerySync `form:"sync,omitempty" json:"sync,omitempty"` } +// PostFeedActionJSONRequestBody defines body for PostFeedAction for application/json ContentType. +type PostFeedActionJSONRequestBody = Action + +// PutFeedActionEndJSONRequestBody defines body for PutFeedActionEnd for application/json ContentType. +type PutFeedActionEndJSONRequestBody = Action + // PostFeedDaemonPingJSONRequestBody defines body for PostFeedDaemonPing for application/json ContentType. type PostFeedDaemonPingJSONRequestBody = PostFeedDaemonPing diff --git a/apihandlers/post_feed_action.go b/apihandlers/post_feed_action.go new file mode 100644 index 0000000..f1792ce --- /dev/null +++ b/apihandlers/post_feed_action.go @@ -0,0 +1,83 @@ +package apihandlers + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/labstack/echo/v4" + + "github.com/opensvc/oc3/api" + "github.com/opensvc/oc3/cachekeys" +) + +// { +// "action": "thaw", +// "argv": [ +// "foo", +// "thaw", +// "--local" +// ], +// "begin": "2026-01-12 10:57:12", +// "cron": false, +// "path": "foo", +// "session_uuid": "b9d795bc-498e-4c20-aada-9feec2eaa947", +// "version": "2.1-1977" +// } + +// PostFeedAction handles POST /action/begin +func (a *Api) PostFeedAction(c echo.Context) error { + keyH := cachekeys.FeedActionH + keyQ := cachekeys.FeedActionQ + keyPendingH := cachekeys.FeedActionPendingH + + log := getLog(c) + + nodeID := nodeIDFromContext(c) + if nodeID == "" { + log.Debug("node auth problem") + return JSONNodeAuthProblem(c) + } + + ClusterID := clusterIDFromContext(c) + if ClusterID == "" { + return JSONProblemf(c, http.StatusConflict, "Refused", "authenticated node doesn't define cluster id") + } + + var payload api.PostFeedActionJSONRequestBody + if err := c.Bind(&payload); err != nil { + return JSONProblem(c, http.StatusBadRequest, "Failed to json decode request body", err.Error()) + } + + if !strings.HasPrefix(payload.Version, "2.") && !strings.HasPrefix(payload.Version, "3.") { + log.Error(fmt.Sprintf("unexpected version %s", payload.Version)) + return JSONProblemf(c, http.StatusBadRequest, "BadRequest", "unsupported data client version: %s", payload.Version) + } + + b, err := json.Marshal(payload) + if err != nil { + return JSONProblem(c, http.StatusInternalServerError, "Failed to re-encode config", err.Error()) + } + + reqCtx := c.Request().Context() + + uuid := uuid.New().String() + idx := fmt.Sprintf("%s@%s@%s:%s", payload.Path, nodeID, ClusterID, uuid) + + s := fmt.Sprintf("HSET %s %s", keyH, idx) + if _, err := a.Redis.HSet(reqCtx, keyH, idx, b).Result(); err != nil { + s = fmt.Sprintf("%s: %s", s, err) + log.Error(s) + return JSONProblem(c, http.StatusInternalServerError, "", s) + } + + if err := a.pushNotPending(reqCtx, keyPendingH, keyQ, idx); err != nil { + log.Error(fmt.Sprintf("can't push %s %s: %s", keyQ, idx, err)) + return JSONProblemf(c, http.StatusInternalServerError, "redis operation", "can't push %s %s: %s", keyQ, idx, err) + } + + log.Debug("action begin accepted") + return c.JSON(http.StatusAccepted, api.ActionRequestAccepted{Uuid: uuid}) +} diff --git a/apihandlers/put_feed_action.go b/apihandlers/put_feed_action.go new file mode 100644 index 0000000..a305a39 --- /dev/null +++ b/apihandlers/put_feed_action.go @@ -0,0 +1,89 @@ +package apihandlers + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/labstack/echo/v4" + + "github.com/opensvc/oc3/api" + "github.com/opensvc/oc3/cachekeys" +) + +// { +// "uuid": "ea9a8373-3dda-4fe7-8c4b-08f5290c6c8b", +// "path": "foo", +// "action": "thaw", +// "begin": "2026-01-16 15:05:00", +// "end": "2026-01-16 15:06:00", +// "cron": false, +// "session_uuid": "7f2df7b2-8a4a-4bc1-9a8b-03acffaacd45", +// "actionlogfile": "/var/tmp/opensvc/foo.freezeupdfl98l", +// "status": "ok" +// } + +// PutFeedActionEnd handles PUT /feed/action +func (a *Api) PutFeedActionEnd(c echo.Context) error { + keyH := cachekeys.FeedActionH + keyQ := cachekeys.FeedActionQ + keyPendingH := cachekeys.FeedActionPendingH + + log := getLog(c) + + nodeID := nodeIDFromContext(c) + if nodeID == "" { + log.Debug("node auth problem") + return JSONNodeAuthProblem(c) + } + + ClusterID := clusterIDFromContext(c) + if ClusterID == "" { + return JSONProblemf(c, http.StatusConflict, "Refused", "authenticated node doesn't define cluster id") + } + + var payload api.PutFeedActionEndJSONRequestBody + if err := c.Bind(&payload); err != nil { + return JSONProblem(c, http.StatusBadRequest, "Failed to json decode request body", err.Error()) + } + + b, err := json.Marshal(payload) + if err != nil { + return JSONProblem(c, http.StatusInternalServerError, "Failed to re-encode config", err.Error()) + } + + reqCtx := c.Request().Context() + + idx := fmt.Sprintf("%s@%s@%s:%s", payload.Path, nodeID, ClusterID, payload.Uuid) + + // if action is pending, update the stored begin action with end info to avoid begin and end processing + if n, err := a.Redis.HExists(reqCtx, keyPendingH, idx).Result(); err == nil && n { + if currentBytes, err := a.Redis.HGet(reqCtx, keyH, idx).Bytes(); err == nil { + var currentAction api.Action + if err := json.Unmarshal(currentBytes, ¤tAction); err == nil { + currentAction.End = payload.End + currentAction.Status = payload.Status + currentAction.Actionlogfile = payload.Actionlogfile + + if updatedBytes, err := json.Marshal(currentAction); err == nil { + b = updatedBytes + } + } + } + } + + s := fmt.Sprintf("HSET %s %s", keyH, idx) + if _, err := a.Redis.HSet(reqCtx, keyH, idx, b).Result(); err != nil { + s = fmt.Sprintf("%s: %s", s, err) + log.Error(s) + return JSONProblem(c, http.StatusInternalServerError, "", s) + } + + if err := a.pushNotPending(reqCtx, keyPendingH, keyQ, idx); err != nil { + log.Error(fmt.Sprintf("can't push %s %s: %s", keyQ, idx, err)) + return JSONProblemf(c, http.StatusInternalServerError, "redis operation", "can't push %s %s: %s", keyQ, idx, err) + } + + log.Debug("action end accepted") + return c.NoContent(http.StatusAccepted) +} diff --git a/cachekeys/main.go b/cachekeys/main.go index 897f709..17b582e 100644 --- a/cachekeys/main.go +++ b/cachekeys/main.go @@ -34,4 +34,8 @@ const ( FeedInstanceStatusQ = "oc3:q:feed_instance_status" FeedInstanceStatusP = "oc3:p:feed_instance_status" FeedInstanceStatusPendingH = "oc3:h:feed_instance_status_pending" + + FeedActionH = "oc3:h:feed_action" + FeedActionQ = "oc3:q:feed_action" + FeedActionPendingH = "oc3:h:feed_action_pending" ) diff --git a/cdb/db_actions.go b/cdb/db_actions.go index 1986bdf..feca3bd 100644 --- a/cdb/db_actions.go +++ b/cdb/db_actions.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "time" "github.com/google/uuid" ) @@ -209,3 +210,162 @@ func (oDb *DB) GetUnfinishedActions(ctx context.Context) (lines []SvcAction, err return } + +func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, action string, begin time.Time, status_log string, sid string, cron bool, end time.Time, status string) (int64, error) { + query := "INSERT INTO svcactions (svc_id, node_id, action, begin, status_log, sid, cron" + placeholders := "?, ?, ?, ?, ?, ?, ?" + args := []any{svcID, nodeID, action, begin, status_log, sid, cron} + + if !end.IsZero() { + query += ", end" + placeholders += ", ?" + args = append(args, end) + } + if status != "" { + query += ", status" + placeholders += ", ?" + args = append(args, status) + } + query += fmt.Sprintf(") VALUES (%s)", placeholders) + + result, err := oDb.DB.ExecContext(ctx, query, args...) + if err != nil { + return 0, err + } + + id, err := result.LastInsertId() + if err != nil { + return 0, err + } + + if rowsAffected, err := result.RowsAffected(); err != nil { + return id, err + } else if rowsAffected > 0 { + oDb.SetChange("svcactions") + } + + return id, nil +} + +func (oDb *DB) UpdateSvcAction(ctx context.Context, svcActionID int64, end time.Time, status string) error { + const query = `UPDATE svcactions SET end = ?, status = ?, time = TIMESTAMPDIFF(SECOND, begin, ?) WHERE id = ?` + result, err := oDb.DB.ExecContext(ctx, query, end, status, end, svcActionID) + if err != nil { + return err + } + if rowsAffected, err := result.RowsAffected(); err != nil { + return err + } else if rowsAffected > 0 { + oDb.SetChange("svcactions") + } + return nil +} + +// FindActionID finds the action ID for the given parameters. +func (oDb *DB) FindActionID(ctx context.Context, nodeID string, svcID string, begin time.Time, action string) (int64, error) { + // todo : check if there is only one result + const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND begin = ? AND action = ? AND pid IS NULL" + var id int64 + if err := oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, begin, action).Scan(&id); err != nil { + return 0, err + } + return id, nil +} + +// UpdateActionErrors updates the action errors in the database. +func (oDb *DB) UpdateActionErrors(ctx context.Context, svcID string, nodeID string) error { + const queryCount = `SELECT count(id) FROM svcactions a + WHERE + a.svc_id = ? AND + a.node_id = ? AND + a.status = "err" AND + ((a.ack <> 1) OR (a.ack IS NULL)) AND + a.begin > DATE_SUB(NOW(), INTERVAL 2 DAY)` + + var errCount int + if err := oDb.DB.QueryRowContext(ctx, queryCount, svcID, nodeID).Scan(&errCount); err != nil { + return fmt.Errorf("UpdateActionErrors: count failed: %w", err) + } + + if errCount == 0 { + const queryDelete = `DELETE FROM b_action_errors + WHERE + svc_id = ? AND + node_id = ?` + if _, err := oDb.DB.ExecContext(ctx, queryDelete, svcID, nodeID); err != nil { + return fmt.Errorf("UpdateActionErrors: delete failed: %w", err) + } + } else { + const queryInsert = `INSERT INTO b_action_errors + SET + svc_id= ?, + node_id= ?, + err= ? + ON DUPLICATE KEY UPDATE + err= ?` + if _, err := oDb.DB.ExecContext(ctx, queryInsert, svcID, nodeID, errCount, errCount); err != nil { + return fmt.Errorf("UpdateActionErrors: insert/update failed: %w", err) + } + } + return nil +} + +// UpdateDashActionErrors updates the dashboard with action errors. +func (oDb *DB) UpdateDashActionErrors(ctx context.Context, svcID string, nodeID string) error { + const query = `SELECT e.err, s.svc_env FROM b_action_errors e + JOIN services s ON e.svc_id=s.svc_id + WHERE + e.svc_id = ? AND + e.node_id = ?` + + var errCount int + var svcEnv sql.NullString + var err error + + err = oDb.DB.QueryRowContext(ctx, query, svcID, nodeID).Scan(&errCount, &svcEnv) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("UpdateDashActionErrors: select failed: %w", err) + } + + if err == nil { + sev := 3 + if svcEnv.Valid && svcEnv.String == "PRD" { + sev = 4 + } + const queryInsert = `INSERT INTO dashboard + SET + dash_type="action errors", + svc_id = ?, + node_id = ?, + dash_severity = ?, + dash_fmt = "%(err)s action errors", + dash_dict = ?, + dash_created = NOW(), + dash_updated = NOW(), + dash_env = ? + ON DUPLICATE KEY UPDATE + dash_severity = ?, + dash_fmt = "%(err)s action errors", + dash_dict = ?, + dash_updated = NOW(), + dash_env = ?` + + dashDict := fmt.Sprintf(`{"err": "%d"}`, errCount) + + if _, err := oDb.DB.ExecContext(ctx, queryInsert, svcID, nodeID, sev, dashDict, svcEnv, sev, dashDict, svcEnv); err != nil { + return fmt.Errorf("UpdateDashActionErrors: insert/update failed: %w", err) + } + // TODO: WebSocket notification + } else { + // TODO: WebSocket notification + const queryDelete = `DELETE FROM dashboard + WHERE + dash_type="action errors" AND + svc_id = ? AND + node_id = ?` + if _, err := oDb.DB.ExecContext(ctx, queryDelete, svcID, nodeID); err != nil { + return fmt.Errorf("UpdateDashActionErrors: delete failed: %w", err) + } + } + return nil +} diff --git a/cdb/db_nodes.go b/cdb/db_nodes.go index 6c3f85c..3d53260 100644 --- a/cdb/db_nodes.go +++ b/cdb/db_nodes.go @@ -28,6 +28,7 @@ type ( EnclosureSlot string Enclosure string Hv string + Tz string } ) @@ -43,19 +44,19 @@ func (oDb *DB) NodeByNodeID(ctx context.Context, nodeID string) (*DBNode, error) var ( query = `SELECT nodename, cluster_id, node_env, app, hv, node_frozen, loc_country, loc_city, loc_addr, loc_building, loc_floor, loc_room, - loc_rack, loc_zip, enclosure, enclosureslot + loc_rack, loc_zip, enclosure, enclosureslot, tz FROM nodes WHERE node_id = ? LIMIT 1` nodename, clusterID, nodeEnv, app, hv, frozen, locCountry sql.NullString locCity, locAddr, locBuilding, locFloor, locRoom, locRack sql.NullString - locZip, enclosure, enclosureSlot sql.NullString + locZip, enclosure, enclosureSlot, tz sql.NullString ) err := oDb.DB. QueryRowContext(ctx, query, nodeID). Scan( &nodename, &clusterID, &nodeEnv, &app, &hv, &frozen, &locCountry, &locCity, &locAddr, &locBuilding, &locFloor, &locRoom, - &locRack, &locZip, &enclosure, &enclosureSlot) + &locRack, &locZip, &enclosure, &enclosureSlot, &tz) switch { case errors.Is(err, sql.ErrNoRows): return nil, nil @@ -80,6 +81,7 @@ func (oDb *DB) NodeByNodeID(ctx context.Context, nodeID string) (*DBNode, error) Enclosure: enclosure.String, EnclosureSlot: enclosureSlot.String, Hv: hv.String, + Tz: tz.String, } return &node, nil } diff --git a/cdb/util.go b/cdb/util.go index 9038bcf..9de5ba7 100644 --- a/cdb/util.go +++ b/cdb/util.go @@ -3,6 +3,8 @@ package cdb import ( "fmt" "log/slog" + "strconv" + "strings" "time" ) @@ -13,3 +15,35 @@ func logDuration(s string, begin time.Time) { func logDurationInfo(s string, begin time.Time) { slog.Info(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin))) } + +// Todo : to move elsewhere... +// Tz can be either a timezone name (ex: "Europe/Paris") or a UTC offset (ex: "+01:00") +func ParseTimeWithTimezone(dateStr, tzStr string) (time.Time, error) { + const timeFormat = "2006-01-02 15:04:05" + + // Named Tz + if loc, err := time.LoadLocation(tzStr); err == nil { + return time.ParseInLocation(timeFormat, dateStr, loc) + } + + // Offset Tz + tzStr = strings.TrimSpace(tzStr) + if tzStr != "" && (tzStr[0] == '+' || tzStr[0] == '-') { + parts := strings.Split(tzStr[1:], ":") + if len(parts) >= 2 { + hours, errH := strconv.Atoi(parts[0]) + minutes, errM := strconv.Atoi(parts[1]) + if errH == nil && errM == nil { + offset := hours*3600 + minutes*60 + if tzStr[0] == '-' { + offset = -offset + } + loc := time.FixedZone(tzStr, offset) + return time.ParseInLocation(timeFormat, dateStr, loc) + } + } + } + + // Fallback + return time.Parse(timeFormat, dateStr) +} diff --git a/worker/job_feed_action.go b/worker/job_feed_action.go new file mode 100644 index 0000000..083c3f4 --- /dev/null +++ b/worker/job_feed_action.go @@ -0,0 +1,167 @@ +package worker + +import ( + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/opensvc/oc3/api" + "github.com/opensvc/oc3/cachekeys" + "github.com/opensvc/oc3/cdb" +) + +type jobFeedAction struct { + *BaseJob + + nodeID string + clusterID string + node *cdb.DBNode + + // idX is the id of the posted action begin with the pattern: @@: + idX string + + objectName string + objectID string + + // data is the posted action begin payload + data *api.PostFeedActionJSONRequestBody + + rawData []byte +} + +func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedAction { + idX := fmt.Sprintf("%s@%s@%s:%s", objectName, nodeID, clusterID, uuid) + + return &jobFeedAction{ + BaseJob: &BaseJob{ + name: "action", + detail: "ID: " + idX, + cachePendingH: cachekeys.FeedActionPendingH, + cachePendingIDX: idX, + }, + idX: idX, + nodeID: nodeID, + clusterID: clusterID, + objectName: objectName, + } +} + +func (d *jobFeedAction) Operations() []operation { + return []operation{ + {desc: "actionBegin/dropPending", do: d.dropPending}, + {desc: "actionBegin/getData", do: d.getData}, + {desc: "actionBegin/findNodeFromDb", do: d.findNodeFromDb}, + {desc: "actionBegin/findObjectFromDb", do: d.findObjectFromDb}, + {desc: "actionBegin/processAction", do: d.updateDB}, + {desc: "actionBegin/pushFromTableChanges", do: d.pushFromTableChanges}, + } +} + +func (d *jobFeedAction) getData() error { + var ( + data api.PostFeedActionJSONRequestBody + ) + if b, err := d.redis.HGet(d.ctx, cachekeys.FeedActionH, d.idX).Bytes(); err != nil { + return fmt.Errorf("getData: HGET %s %s: %w", cachekeys.FeedActionH, d.idX, err) + } else if err = json.Unmarshal(b, &data); err != nil { + return fmt.Errorf("getData: unexpected data from %s %s: %w", cachekeys.FeedActionH, d.idX, err) + } else { + d.rawData = b + d.data = &data + } + + slog.Info(fmt.Sprintf("got action begin data for node %s:%#v", d.nodeID, d.data)) + return nil +} + +func (d *jobFeedAction) findNodeFromDb() error { + if n, err := d.oDb.NodeByNodeID(d.ctx, d.nodeID); err != nil { + return fmt.Errorf("findNodeFromDb: node %s: %w", d.nodeID, err) + } else { + d.node = n + } + slog.Info(fmt.Sprintf("jobFeedActionBegin found node %s for id %s", d.node.Nodename, d.nodeID)) + return nil +} + +func (d *jobFeedAction) findObjectFromDb() error { + if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(d.ctx, d.objectName, d.clusterID); err != nil { + return fmt.Errorf("find or create object ID failed for %s: %w", d.objectName, err) + } else if isNew { + slog.Info(fmt.Sprintf("jobFeedActionBegin has created new object id %s@%s %s", d.objectName, d.clusterID, objId)) + } else { + d.objectID = objId + slog.Info(fmt.Sprintf("jobFeedActionBegin found object id %s@%s %s", d.objectName, d.clusterID, objId)) + } + + return nil +} + +func (d *jobFeedAction) updateDB() error { + if d.data == nil || d.data.Path == "" { + return fmt.Errorf("invalid action data: missing path") + } + + objectUUID, err := uuid.Parse(d.objectID) + if err != nil { + return fmt.Errorf("invalid object ID UUID: %w", err) + } + nodeUUID, err := uuid.Parse(d.nodeID) + if err != nil { + return fmt.Errorf("invalid node ID UUID: %w", err) + } + beginTime, err := cdb.ParseTimeWithTimezone(d.data.Begin, d.node.Tz) + if err != nil { + return fmt.Errorf("invalid begin time format: %w", err) + } + + status_log := "" + if len(d.data.Argv) > 0 { + status_log = fmt.Sprintf("%s", d.data.Argv[0]) + for i := 1; i < len(d.data.Argv); i++ { + status_log += " " + d.data.Argv[i] + } + } + + if d.data.End != "" { + // field End is present, process as action end + endTime, err := cdb.ParseTimeWithTimezone(d.data.End, d.node.Tz) + if err != nil { + return fmt.Errorf("invalid end time format: %w", err) + } + + actionId, err := d.oDb.FindActionID(d.ctx, d.nodeID, d.objectID, beginTime, d.data.Action) + if err != nil { + return fmt.Errorf("find action ID failed: %w", err) + } + + if actionId == 0 { + // begin not processed yet, insert full record + if _, err := d.oDb.InsertSvcAction(d.ctx, objectUUID, nodeUUID, d.data.Action, beginTime, status_log, d.data.SessionUuid, d.data.Cron, endTime, d.data.Status); err != nil { + return fmt.Errorf("insert svc action failed: %w", err) + } + } else { + // begin already processed, update record with end info + if err := d.oDb.UpdateSvcAction(d.ctx, actionId, endTime, d.data.Status); err != nil { + return fmt.Errorf("end svc action failed: %w", err) + } + } + + if d.data.Status == "err" { + if err := d.oDb.UpdateActionErrors(d.ctx, d.objectID, d.nodeID); err != nil { + return fmt.Errorf("update action errors failed: %w", err) + } + if err := d.oDb.UpdateDashActionErrors(d.ctx, d.objectID, d.nodeID); err != nil { + return fmt.Errorf("update dash action errors failed: %w", err) + } + } + + } else { + // field End is not present, process as action begin + d.oDb.InsertSvcAction(d.ctx, objectUUID, nodeUUID, d.data.Action, beginTime, status_log, d.data.SessionUuid, d.data.Cron, time.Time{}, "") + } + + return nil +} diff --git a/worker/worker.go b/worker/worker.go index 8b38349..aaa8a9a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -164,6 +164,16 @@ func (w *Worker) runJob(unqueuedJob []string) error { return err } j = newInstanceStatus(objectName, nodeID, clusterID) + + case cachekeys.FeedActionQ: + objectName, nodeID, ClusterID, uuid, err := w.jobToInstanceClusterIdAndUuid(unqueuedJob[1]) + if err != nil { + err := fmt.Errorf("invalid feed begin action index: %s", unqueuedJob[1]) + slog.Warn(err.Error()) + return err + } + j = newAction(objectName, nodeID, ClusterID, uuid) + default: slog.Debug(fmt.Sprintf("ignore queue '%s'", unqueuedJob[0])) return nil @@ -210,3 +220,17 @@ func (w *Worker) jobToInstanceAndClusterID(jobName string) (path, nodeID, cluste } return } + +func (w *Worker) jobToInstanceClusterIdAndUuid(jobName string) (path, nodeID, clusterID, uuid string, err error) { + l := strings.Split(jobName, ":") + if len(l) != 2 { + err = fmt.Errorf("unexpected job name: %s", jobName) + return + } + if path, nodeID, clusterID, err = w.jobToInstanceAndClusterID(l[0]); err != nil { + err = fmt.Errorf("unexpected job name: %s", jobName) + return + } + uuid = l[1] + return +}