Skip to content

Commit 5596039

Browse files
committed
feat: support cancelling restores
1 parent 45fff5a commit 5596039

File tree

5 files changed

+246
-43
lines changed

5 files changed

+246
-43
lines changed

internal/helpers/helper_types.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package helpers
22

3+
import (
4+
"context"
5+
6+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
7+
"k8s.io/apimachinery/pkg/types"
8+
"sigs.k8s.io/controller-runtime/pkg/client"
9+
)
10+
311
// LagoonEnvironmentVariable is used to define Lagoon environment variables.
412
type LagoonEnvironmentVariable struct {
513
Name string `json:"name"`
@@ -31,3 +39,27 @@ type LagoonAPIConfiguration struct {
3139
SSHHost string
3240
SSHPort string
3341
}
42+
43+
func K8UPVersions(ctx context.Context, c client.Client) (bool, bool, error) {
44+
k8upv1alpha1Exists := false
45+
k8upv1Exists := false
46+
crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{}
47+
if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil {
48+
if err := IgnoreNotFound(err); err != nil {
49+
return k8upv1alpha1Exists, k8upv1Exists, err
50+
}
51+
}
52+
if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" {
53+
k8upv1alpha1Exists = true
54+
}
55+
crdv1 := &apiextensionsv1.CustomResourceDefinition{}
56+
if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil {
57+
if err := IgnoreNotFound(err); err != nil {
58+
return k8upv1alpha1Exists, k8upv1Exists, err
59+
}
60+
}
61+
if crdv1.ObjectMeta.Name == "restores.k8up.io" {
62+
k8upv1Exists = true
63+
}
64+
return k8upv1alpha1Exists, k8upv1Exists, nil
65+
}

internal/messenger/consumer.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,14 +406,25 @@ func (m *Messenger) Consumer(targetName string) { //error {
406406
}
407407
}
408408
case "deploytarget:restic:backup:restore", "kubernetes:restic:backup:restore":
409+
v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client)
410+
if err != nil {
411+
//@TODO: send msg back to lagoon and update task to failed?
412+
message.Ack(false) // ack to remove from queue
413+
return
414+
}
415+
if !v1alpha1 && !v1 {
416+
// k8up not installed
417+
message.Ack(false) // ack to remove from queue
418+
return
419+
}
409420
opLog.Info(
410421
fmt.Sprintf(
411422
"Received backup restoration for project %s, environment %s",
412423
jobSpec.Project.Name,
413424
jobSpec.Environment.Name,
414425
),
415426
)
416-
err := m.ResticRestore(namespace, jobSpec)
427+
err = m.ResticRestore(ctx, namespace, jobSpec, v1alpha1, v1, false)
417428
if err != nil {
418429
opLog.Error(err,
419430
fmt.Sprintf(
@@ -426,6 +437,39 @@ func (m *Messenger) Consumer(targetName string) { //error {
426437
message.Ack(false) // ack to remove from queue
427438
return
428439
}
440+
case "deploytarget:restic:cancel:restore":
441+
v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client)
442+
if err != nil {
443+
//@TODO: send msg back to lagoon and update task to failed?
444+
message.Ack(false) // ack to remove from queue
445+
return
446+
}
447+
if !v1alpha1 && !v1 {
448+
// k8up not installed
449+
message.Ack(false) // ack to remove from queue
450+
return
451+
}
452+
// if this is a request to cancel a restore attempt
453+
opLog.Info(
454+
fmt.Sprintf(
455+
"Received restore cancellation for project %s, environment %s",
456+
jobSpec.Project.Name,
457+
jobSpec.Environment.Name,
458+
),
459+
)
460+
err = m.ResticRestore(ctx, namespace, jobSpec, v1alpha1, v1, true)
461+
if err != nil {
462+
opLog.Error(err,
463+
fmt.Sprintf(
464+
"Cancel restore for project %s, environment %s failed",
465+
jobSpec.Project.Name,
466+
jobSpec.Environment.Name,
467+
),
468+
)
469+
//@TODO: send msg back to lagoon and update task to failed?
470+
message.Ack(false) // ack to remove from queue
471+
return
472+
}
429473
case "deploytarget:route:migrate", "kubernetes:route:migrate", "openshift:route:migrate":
430474
opLog.Info(
431475
fmt.Sprintf(

internal/messenger/tasks_restore.go

Lines changed: 122 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,24 @@ import (
66
"fmt"
77

88
"github.com/go-logr/logr"
9+
"github.com/uselagoon/machinery/api/schema"
910
lagoonv1beta2 "github.com/uselagoon/remote-controller/api/lagoon/v1beta2"
1011
"github.com/uselagoon/remote-controller/internal/helpers"
12+
"k8s.io/apimachinery/pkg/types"
13+
1114
ctrl "sigs.k8s.io/controller-runtime"
1215

1316
k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
1417
k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
15-
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
16-
"k8s.io/apimachinery/pkg/types"
1718
)
1819

20+
type cancelRestore struct {
21+
RestoreName string `json:"restoreName"`
22+
BackupID string `json:"backupId"`
23+
}
24+
1925
// ResticRestore handles creating the restic restore jobs.
20-
func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
26+
func (m *Messenger) ResticRestore(ctx context.Context, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec, v1alpha1, v1, cancel bool) error {
2127
opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks")
2228
vers, err := checkRestoreVersionFromCore(jobSpec.Misc.MiscResource)
2329
if err != nil {
@@ -31,51 +37,41 @@ func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.Lagoo
3137
return nil
3238
}
3339

34-
// check if k8up crds exist in the cluster
35-
k8upv1alpha1Exists := false
36-
k8upv1Exists := false
37-
crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{}
38-
if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil {
39-
if err := helpers.IgnoreNotFound(err); err != nil {
40-
return err
41-
}
42-
}
43-
if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" {
44-
k8upv1alpha1Exists = true
45-
}
46-
crdv1 := &apiextensionsv1.CustomResourceDefinition{}
47-
if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil {
48-
if err := helpers.IgnoreNotFound(err); err != nil {
49-
return err
50-
}
51-
}
52-
if crdv1.ObjectMeta.Name == "restores.k8up.io" {
53-
k8upv1Exists = true
54-
}
40+
handlev1alpha1 := false
41+
handlev1 := false
5542
// check the version, if there is no version in the payload, assume it is k8up v2
5643
if m.SupportK8upV2 {
5744
if vers == "backup.appuio.ch/v1alpha1" {
58-
if k8upv1alpha1Exists {
59-
return m.createv1alpha1Restore(opLog, namespace, jobSpec)
45+
if v1alpha1 {
46+
handlev1alpha1 = true
6047
}
6148
} else {
62-
if k8upv1Exists {
63-
if err := m.createv1Restore(opLog, namespace, jobSpec); err != nil {
64-
return err
65-
}
49+
if v1 {
50+
handlev1 = true
6651
} else {
67-
if k8upv1alpha1Exists {
68-
if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil {
69-
return err
70-
}
52+
if v1alpha1 {
53+
handlev1alpha1 = true
7154
}
7255
}
7356
}
7457
} else {
75-
if k8upv1alpha1Exists {
76-
if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil {
77-
return err
78-
}
58+
if v1alpha1 {
59+
handlev1alpha1 = true
60+
}
61+
}
62+
63+
if handlev1alpha1 {
64+
if cancel {
65+
return m.cancelv1alpha1Restore(ctx, opLog, namespace, jobSpec)
66+
} else {
67+
return m.createv1alpha1Restore(ctx, opLog, namespace, jobSpec)
68+
}
69+
}
70+
if handlev1 {
71+
if cancel {
72+
return m.cancelv1Restore(ctx, opLog, namespace, jobSpec)
73+
} else {
74+
return m.createv1Restore(ctx, opLog, namespace, jobSpec)
7975
}
8076
}
8177
return nil
@@ -97,7 +93,7 @@ func checkRestoreVersionFromCore(resource []byte) (string, error) {
9793
}
9894

9995
// createv1alpha1Restore will create a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1)
100-
func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
96+
func (m *Messenger) createv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
10197
restorev1alpha1 := &k8upv1alpha1.Restore{}
10298
if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1alpha1); err != nil {
10399
opLog.Error(err,
@@ -109,7 +105,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j
109105
return err
110106
}
111107
restorev1alpha1.SetNamespace(namespace)
112-
if err := m.Client.Create(context.Background(), restorev1alpha1); err != nil {
108+
if err := m.Client.Create(ctx, restorev1alpha1); err != nil {
113109
opLog.Error(err,
114110
fmt.Sprintf(
115111
"Unable to create restore %s with k8up v1alpha1 api.",
@@ -122,7 +118,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j
122118
}
123119

124120
// createv1Restore will create a restore task using the restores.k8up.io v1 api (k8up v2)
125-
func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
121+
func (m *Messenger) createv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
126122
restorev1 := &k8upv1.Restore{}
127123
if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1); err != nil {
128124
opLog.Error(err,
@@ -134,7 +130,7 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec
134130
return err
135131
}
136132
restorev1.SetNamespace(namespace)
137-
if err := m.Client.Create(context.Background(), restorev1); err != nil {
133+
if err := m.Client.Create(ctx, restorev1); err != nil {
138134
opLog.Error(err,
139135
fmt.Sprintf(
140136
"Unable to create restore %s with k8up v1 api.",
@@ -145,3 +141,87 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec
145141
}
146142
return nil
147143
}
144+
145+
// cancelv1alpha1Restore will attempt to cancel a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1)
146+
func (m *Messenger) cancelv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
147+
restorev1alpha1 := &k8upv1alpha1.Restore{}
148+
cr := &cancelRestore{}
149+
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil {
150+
return err
151+
}
152+
if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1alpha1); helpers.IgnoreNotFound(err) != nil {
153+
opLog.Error(err,
154+
fmt.Sprintf(
155+
"Unable to get restore %s with k8up v1alpha1 api.",
156+
cr.RestoreName,
157+
),
158+
)
159+
return err
160+
}
161+
if restorev1alpha1.Name != "" {
162+
if err := m.Client.Delete(ctx, restorev1alpha1); err != nil {
163+
opLog.Error(err,
164+
fmt.Sprintf(
165+
"Unable to delete restore %s with k8up v1alpha1 api.",
166+
cr.RestoreName,
167+
),
168+
)
169+
return err
170+
}
171+
}
172+
// if no matching restore found, or the restore is deleted, send the cancellation message back to core
173+
m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec)
174+
return nil
175+
}
176+
177+
// cancelv1Restore will attempt to cancel a restore task using the restores.k8up.io v1 api (k8up v2)
178+
func (m *Messenger) cancelv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
179+
restorev1 := &k8upv1.Restore{}
180+
cr := &cancelRestore{}
181+
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil {
182+
return err
183+
}
184+
if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1); helpers.IgnoreNotFound(err) != nil {
185+
opLog.Error(err,
186+
fmt.Sprintf(
187+
"Unable to get restore %s with k8up v1 api.",
188+
cr.RestoreName,
189+
),
190+
)
191+
return err
192+
}
193+
if restorev1.Name != "" {
194+
if err := m.Client.Delete(ctx, restorev1); err != nil {
195+
opLog.Error(err,
196+
fmt.Sprintf(
197+
"Unable to delete restore %s with k8up v1alpha1 api.",
198+
cr.RestoreName,
199+
),
200+
)
201+
return err
202+
}
203+
}
204+
// if no matching restore found, or the restore is deleted, send the cancellation message back to core
205+
m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec)
206+
return nil
207+
}
208+
209+
func (m *Messenger) pubRestoreCancel(opLog logr.Logger, namespace, restorename string, jobSpec *lagoonv1beta2.LagoonTaskSpec) {
210+
msg := schema.LagoonMessage{
211+
Type: "restore:cancel",
212+
Namespace: namespace,
213+
Meta: &schema.LagoonLogMeta{
214+
Environment: jobSpec.Environment.Name,
215+
Project: jobSpec.Project.Name,
216+
JobName: restorename,
217+
},
218+
}
219+
msgBytes, err := json.Marshal(msg)
220+
if err != nil {
221+
opLog.Error(err, "Unable to encode message as JSON")
222+
}
223+
// publish the cancellation result back to lagoon
224+
if err := m.Publish("lagoon-tasks:controller", msgBytes); err != nil {
225+
opLog.Error(err, "Unable to publish message.")
226+
}
227+
}

test/e2e/e2e_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,36 @@ var _ = Describe("controller", Ordered, func() {
372372
Expect(strings.TrimSpace(string(result))).To(Equal(string(testResult)))
373373
}
374374

375+
By("validating that restore cancellations are working")
376+
By("creating a restore cancellation task via rabbitmq")
377+
cmd = exec.Command(
378+
"curl",
379+
"-s",
380+
"-u",
381+
"guest:guest",
382+
"-H",
383+
"'Accept: application/json'",
384+
"-H",
385+
"'Content-Type:application/json'",
386+
"-X",
387+
"POST",
388+
"-d",
389+
"@test/e2e/testdata/cancel-restore.json",
390+
"http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish",
391+
)
392+
_, err = utils.Run(cmd)
393+
ExpectWithOffset(1, err).NotTo(HaveOccurred())
394+
395+
time.Sleep(10 * time.Second)
396+
397+
By("validating that the restore is deleted")
398+
cmd = exec.Command("kubectl", "get",
399+
"restores.k8up.io", "restore-bf072a0-uqxqo4",
400+
"-n", "nginx-example-main",
401+
)
402+
_, err = utils.Run(cmd)
403+
ExpectWithOffset(1, err).To(HaveOccurred())
404+
375405
By("validating that the harbor robot credentials get rotated successfully")
376406
cmd = exec.Command("kubectl", "get",
377407
"pods", "-l", "control-plane=controller-manager",
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{"properties":{"delivery_mode":2},"routing_key":"ci-local-controller-kubernetes:misc",
2+
"payload":"{
3+
\"misc\":{
4+
\"miscResource\":\"eyJyZXN0b3JlTmFtZSI6InJlc3RvcmUtYmYwNzJhMC11cXhxbzQiLCJiYWNrdXBJZCI6ImJmMDcyYTA5ZTE3NzI2ZGE1NGFkYzc5OTM2ZWM4NzQ1NTIxOTkzNTk5ZDQxMjExZGZjOTQ2NmRmZDViYzMyYTUifQ==\"
5+
},
6+
\"key\":\"deploytarget:restic:cancel:restore\",
7+
\"environment\":{
8+
\"name\":\"main\",
9+
\"openshiftProjectName\":\"nginx-example-main\"
10+
},
11+
\"project\":{
12+
\"name\":\"nginx-example\"
13+
},
14+
\"advancedTask\":{}
15+
}",
16+
"payload_encoding":"string"
17+
}

0 commit comments

Comments
 (0)