diff --git a/internal/kube/controller/controller.go b/internal/kube/controller/controller.go index fc60ab5f7..a2d6fd71b 100644 --- a/internal/kube/controller/controller.go +++ b/internal/kube/controller/controller.go @@ -83,9 +83,11 @@ func labelling() internalinterfaces.TweakListOptionsFunc { } } +var eventProcessorCustomizers []watchers.EventProcessorCustomizer + func NewController(cli internalclient.Clients, config *Config) (*Controller, error) { controller := &Controller{ - eventProcessor: watchers.NewEventProcessor("Controller", cli), + eventProcessor: watchers.NewEventProcessor("Controller", cli, eventProcessorCustomizers...), sites: map[string]*site.Site{}, siteSizing: sizing.NewRegistry(), labelling: labels.NewLabelsAndAnnotations(config.Namespace), @@ -448,12 +450,29 @@ func (c *Controller) checkAttachedConnectorBinding(key string, binding *skupperv func (c *Controller) checkAttachedConnector(key string, connector *skupperv2alpha1.AttachedConnector) error { if connector == nil { if previous, ok := c.attachableConnectors[key]; ok { + c.log.Info("AttachedConnector deleted", slog.String("key", key)) delete(c.attachableConnectors, key) return c.getSite(previous.Spec.SiteNamespace).AttachedConnectorDeleted(previous.Namespace, previous.Name) } else { return nil } } else { + if previous, ok := c.attachableConnectors[key]; ok { + if previous.Spec.SiteNamespace != connector.Spec.SiteNamespace { + c.log.Info("AttachedConnector site namespace has changed", + slog.String("key", key), + slog.String("from", previous.Spec.SiteNamespace), + slog.String("to", connector.Spec.SiteNamespace), + ) + err := c.getSite(previous.Spec.SiteNamespace).AttachedConnectorUnreferenced(previous) + if err != nil { + c.log.Error("Error removing AttachedConnector reference from previous namespace", + slog.String("key", key), + slog.String("previous", previous.Spec.SiteNamespace)) + } + } + } + c.attachableConnectors[key] = connector return c.getSite(connector.Spec.SiteNamespace).AttachedConnectorUpdated(connector) } } diff --git a/internal/kube/controller/controller_test.go b/internal/kube/controller/controller_test.go index 431824dc4..aec3c9860 100644 --- a/internal/kube/controller/controller_test.go +++ b/internal/kube/controller/controller_test.go @@ -8,8 +8,10 @@ import ( "reflect" "strings" "testing" + "time" "github.com/google/uuid" + "github.com/skupperproject/skupper/internal/kube/watchers" "gotest.tools/v3/assert" appsv1 "k8s.io/api/apps/v1" @@ -37,6 +39,7 @@ import ( ) type WaitFunction func(t *testing.T, clients internalclient.Clients) bool +type ControllerFunction func(t *testing.T, controller *Controller) bool func TestGeneral(t *testing.T) { fakeNetworkStatus := f.fakeNetworkStatusInfo("test") @@ -605,6 +608,7 @@ func TestUpdate(t *testing.T) { k8sObjects []runtime.Object skupperObjects []runtime.Object functions []WaitFunction + postFunctions []ControllerFunction }{ { name: "change listener host", @@ -640,8 +644,126 @@ func TestUpdate(t *testing.T) { deleteTargetPod("mypod-1", "test"), serviceCheck("mypod-1", "test").checkAbsent, }, + }, { + name: "unreferenced attached connector", + skupperObjects: []runtime.Object{ + f.site("mysite", "test", "", false, false), + f.attachedConnectorBinding("myconnector", "test", "test2"), + f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080), + f.listener("mylistener", "test", "mysvc", 8080), + }, + k8sObjects: []runtime.Object{ + f.pod("foo", "test2", map[string]string{"app": "foo"}, nil, + f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))), + }, + functions: []WaitFunction{ + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + serviceCheck("mysvc", "test").check, + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + updateAttachedConnectorSiteNamespace("myconnector", "test2", "test3", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse), + }, + postFunctions: []ControllerFunction{ + podWatchers(1, 1), + }, + }, { + name: "deleted attached connector", + skupperObjects: []runtime.Object{ + f.site("mysite", "test", "", false, false), + f.attachedConnectorBinding("myconnector", "test", "test2"), + f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080), + f.listener("mylistener", "test", "mysvc", 8080), + }, + k8sObjects: []runtime.Object{ + f.pod("foo", "test2", map[string]string{"app": "foo"}, nil, + f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))), + }, + functions: []WaitFunction{ + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + serviceCheck("mysvc", "test").check, + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + deleteAttachedConnector("myconnector", "test2"), + }, + postFunctions: []ControllerFunction{ + podWatchers(1, 1), + }, + }, { + name: "unreferenced attached connector binding", + skupperObjects: []runtime.Object{ + f.site("mysite", "test", "", false, false), + f.attachedConnectorBinding("myconnector", "test", "test2"), + f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080), + f.listener("mylistener", "test", "mysvc", 8080), + }, + k8sObjects: []runtime.Object{ + f.pod("foo", "test2", map[string]string{"app": "foo"}, nil, + f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))), + }, + functions: []WaitFunction{ + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + serviceCheck("mysvc", "test").check, + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue), + updateAttachedConnectorBindingConnectorNamespace("myconnector", "test", "test3", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse), + isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse), + }, + postFunctions: []ControllerFunction{ + podWatchers(1, 1), + }, + }, { + name: "deleted attached connector binding", + skupperObjects: []runtime.Object{ + f.site("mysite", "test", "", false, false), + f.attachedConnectorBinding("myconnector", "test", "test2"), + f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080), + f.listener("mylistener", "test", "mysvc", 8080), + }, + k8sObjects: []runtime.Object{ + f.pod("foo", "test2", map[string]string{"app": "foo"}, nil, + f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))), + }, + functions: []WaitFunction{ + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + serviceCheck("mysvc", "test").check, + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue), + deleteAttachedConnectorBinding("myconnector", "test"), + isAttachedConnectorStatusCondition("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse), + }, + postFunctions: []ControllerFunction{ + podWatchers(1, 1), + }, + }, { + name: "site deleted", + skupperObjects: []runtime.Object{ + f.addUID(f.site("mysite", "test", "", false, false), "49b03ad4-d414-42be-bbb5-b32d7d4ca503"), + f.attachedConnectorBinding("myconnector", "test", "test2"), + f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080), + f.listener("mylistener", "test", "mysvc", 8080), + }, + k8sObjects: []runtime.Object{ + f.pod("foo", "test2", map[string]string{"app": "foo"}, nil, + f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))), + }, + functions: []WaitFunction{ + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + serviceCheck("mysvc", "test").check, + isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED), + isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue), + deleteSite("mysite", "test"), + }, + postFunctions: []ControllerFunction{ + podWatchers(1, 1), + }, }, } + defer func() { + eventProcessorCustomizers = nil + }() for _, tt := range testTable { t.Run(tt.name, func(t *testing.T) { flags := &flag.FlagSet{} @@ -654,6 +776,11 @@ func TestUpdate(t *testing.T) { clients, err := fakeclient.NewFakeClient(config.Namespace, tt.k8sObjects, tt.skupperObjects, "") assert.Assert(t, err) enableSSA(clients.GetDynamicClient()) + eventProcessorCustomizers = []watchers.EventProcessorCustomizer{ + func(e *watchers.EventProcessor) { + e.SetResyncShort(time.Second) + }, + } controller, err := NewController(clients, config) assert.Assert(t, err) stopCh := make(chan struct{}) @@ -668,10 +795,31 @@ func TestUpdate(t *testing.T) { controller.eventProcessor.TestProcess() } } + for _, f := range tt.postFunctions { + for !f(t, controller) { + controller.eventProcessor.TestProcess() + } + } }) } } +func deleteAttachedConnector(name string, namespace string) WaitFunction { + return func(t *testing.T, clients internalclient.Clients) bool { + err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}) + assert.Assert(t, err) + return true + } +} + +func deleteAttachedConnectorBinding(name string, namespace string) WaitFunction { + return func(t *testing.T, clients internalclient.Clients) bool { + err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}) + assert.Assert(t, err) + return true + } +} + func verifyStatus(t *testing.T, expected skupperv2alpha1.Status, actual skupperv2alpha1.Status) { t.Helper() assert.Equal(t, expected.StatusType, actual.StatusType, actual.Message) @@ -1643,11 +1791,89 @@ func isListenerStatusConditionTrue(name string, namespace string, condition stri } } +func updateAttachedConnectorSiteNamespace(name string, namespace string, siteNamespace string, condition string, conditionStatus metav1.ConditionStatus) WaitFunction { + return func(t *testing.T, clients internalclient.Clients) bool { + cli := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace) + connector, err := cli.Get(context.Background(), name, metav1.GetOptions{}) + assert.Assert(t, err) + if connector.Spec.SiteNamespace != siteNamespace { + t.Logf("updating siteNamespace") + connector.Spec.SiteNamespace = siteNamespace + _, err = cli.Update(context.Background(), connector, metav1.UpdateOptions{}) + assert.Assert(t, err) + return false + } + return isConditionUpToDate(connector.Status.Conditions, condition, connector.ObjectMeta.Generation) && meta.IsStatusConditionPresentAndEqual(connector.Status.Conditions, condition, conditionStatus) + } +} + +func updateAttachedConnectorBindingConnectorNamespace(name string, namespace string, connectorNamespace string, condition string, status metav1.ConditionStatus) WaitFunction { + return func(t *testing.T, clients internalclient.Clients) bool { + cli := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace) + binding, err := cli.Get(context.Background(), name, metav1.GetOptions{}) + assert.Assert(t, err) + if binding.Spec.ConnectorNamespace != connectorNamespace { + t.Logf("updating connectorNamespace") + binding.Spec.ConnectorNamespace = connectorNamespace + _, err = cli.Update(context.Background(), binding, metav1.UpdateOptions{}) + assert.Assert(t, err) + return false + } + return isConditionUpToDate(binding.Status.Conditions, condition, binding.ObjectMeta.Generation) && meta.IsStatusConditionPresentAndEqual(binding.Status.Conditions, condition, status) + } +} + func isAttachedConnectorStatusConditionTrue(name string, namespace string, condition string) WaitFunction { + return isAttachedConnectorStatusCondition(name, namespace, condition, metav1.ConditionTrue) +} + +func isAttachedConnectorStatusCondition(name string, namespace string, condition string, status metav1.ConditionStatus) WaitFunction { return func(t *testing.T, clients internalclient.Clients) bool { connector, err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Get(context.Background(), name, metav1.GetOptions{}) assert.Assert(t, err) - return meta.IsStatusConditionTrue(connector.Status.Conditions, condition) + return meta.IsStatusConditionPresentAndEqual(connector.Status.Conditions, condition, status) + } +} + +func isAttachedConnectorBindingStatusCondition(name string, namespace string, condition string, status metav1.ConditionStatus) WaitFunction { + return func(t *testing.T, clients internalclient.Clients) bool { + binding, err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace).Get(context.Background(), name, metav1.GetOptions{}) + assert.Assert(t, err) + return meta.IsStatusConditionPresentAndEqual(binding.Status.Conditions, condition, status) + } +} + +func podWatchers(expectedRunning int, expectedStopped int) ControllerFunction { + return func(t *testing.T, controller *Controller) bool { + for { + var running int + var stopped int + for _, w := range controller.eventProcessor.GetWatchers() { + if rw, ok := w.(*watchers.ResourceWatcher[*corev1.Pod]); ok { + if rw.IsStopped() { + stopped++ + } else { + running++ + } + } + } + if expectedRunning == running && expectedStopped == stopped { + return true + } + t.Logf("PodWatchers count do not match - Expected Running %d/%d - Expected Stopped %d/%d", running, expectedRunning, stopped, expectedStopped) + return false + } + } +} + +func deleteSite(name string, namespace string) WaitFunction { + return func(t *testing.T, clients internalclient.Clients) bool { + site, err := clients.GetSkupperClient().SkupperV2alpha1().Sites(namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err == nil && site != nil { + _ = clients.GetSkupperClient().SkupperV2alpha1().Sites(namespace).Delete(context.Background(), name, metav1.DeleteOptions{}) + return false + } + return true } } diff --git a/internal/kube/site/attached_connector.go b/internal/kube/site/attached_connector.go index 7763511bf..946d56946 100644 --- a/internal/kube/site/attached_connector.go +++ b/internal/kube/site/attached_connector.go @@ -144,6 +144,12 @@ func (a *AttachedConnector) Updated(pods []skupperv2alpha1.PodDetails) error { if a.binding == nil { return a.updateStatusNoBinding() } + a.parent.logger.Debug("Updated AttachedConnector pods", + slog.String("Namespace", a.binding.Namespace), + slog.String("Name", a.binding.Name), + slog.String("siteId", a.parent.bindings.SiteId), + slog.String("memory", fmt.Sprintf("%p", a)), + ) definition := a.activeDefinition() if definition == nil { return a.updateStatusTo(fmt.Errorf("No matching AttachedConnector"), nil) @@ -230,7 +236,8 @@ func (a *AttachedConnector) definitionUpdated(definition *skupperv2alpha1.Attach } a.definitions[definition.Namespace] = definition if a.binding != nil && a.binding.Spec.ConnectorNamespace == definition.Namespace { - if selectorChanged || a.watcher == nil { + isSiteActive := a.parent.site != nil && a.parent.site.IsInitialised() + if isSiteActive && (selectorChanged || a.watcher == nil) { a.parent.logger.Info("Watching pods for AttachedConnector", slog.String("namespace", definition.Namespace), slog.String("name", definition.Name)) @@ -271,6 +278,7 @@ func (a *AttachedConnector) definitionDeleted(namespace string) bool { if _, ok := a.definitions[namespace]; ok { if a.watcher != nil { a.watcher.Close() + a.watcher = nil } delete(a.definitions, namespace) return true @@ -282,14 +290,19 @@ func (a *AttachedConnector) bindingDeleted() bool { if a.binding == nil { return false } + a.parent.logger.Info("AttachedConnectorBinding deleted", + slog.String("key", fmt.Sprintf("%s/%s", a.binding.Namespace, a.binding.Name)), + ) a.binding = nil + a.unbind() return true } -func (a *AttachedConnector) updateBridgeConfig(siteId string, config *qdr.BridgeConfig) { +func (a *AttachedConnector) updateBridgeConfig(siteId string, config *qdr.BridgeConfig) bool { + var updated bool definition := a.activeDefinition() if definition == nil || a.watcher == nil { - return + return updated } connector := &skupperv2alpha1.Connector{ ObjectMeta: metav1.ObjectMeta{ @@ -303,6 +316,18 @@ func (a *AttachedConnector) updateBridgeConfig(siteId string, config *qdr.Bridge }, } for _, pod := range a.watcher.pods() { - site.UpdateBridgeConfigForConnectorToPod(siteId, connector, pod, a.binding.Spec.ExposePodsByName, config) + if site.UpdateBridgeConfigForConnectorToPod(siteId, connector, pod, a.binding.Spec.ExposePodsByName, config) { + updated = true + } + } + return updated +} + +func (a *AttachedConnector) unbind() bool { + if a.watcher != nil { + a.watcher.Close() + a.watcher = nil + return true } + return false } diff --git a/internal/kube/site/bindings.go b/internal/kube/site/bindings.go index 0d524df83..e860fbc9c 100644 --- a/internal/kube/site/bindings.go +++ b/internal/kube/site/bindings.go @@ -124,5 +124,6 @@ func (w *PodWatcher) handle(key string, pod *corev1.Pod) error { } func (w *PodWatcher) Close() { + bindings_logger.Debug("Stopping pod watcher", w.context.Attr(), slog.String("selector", w.context.Selector())) close(w.stopCh) } diff --git a/internal/kube/site/extended_bindings.go b/internal/kube/site/extended_bindings.go index e4f12fe85..ad37ad605 100644 --- a/internal/kube/site/extended_bindings.go +++ b/internal/kube/site/extended_bindings.go @@ -2,6 +2,7 @@ package site import ( "errors" + "fmt" "log/slog" "github.com/skupperproject/skupper/internal/kube/watchers" @@ -55,6 +56,11 @@ func (a *ExtendedBindings) cleanup() { for _, s := range a.selectors { s.Close() } + for _, connector := range a.connectors { + if connector.watcher != nil { + connector.watcher.Close() + } + } } func (a *ExtendedBindings) ConnectorUpdated(connector *skupperv2alpha1.Connector) bool { @@ -253,18 +259,31 @@ func (b *ExtendedBindings) MapOverAttachedConnectors(cf AttachedConnectorFunctio } func (b *ExtendedBindings) Apply(config *qdr.RouterConfig) bool { + var updated bool desired := b.bindings.ToBridgeConfig() for _, connector := range b.connectors { - connector.updateBridgeConfig(b.bindings.SiteId, &desired) - b.AddSslProfiles(config, connector.definitions) + if connector.updateBridgeConfig(b.bindings.SiteId, &desired) { + updated = true + } + if b.AddSslProfiles(config, connector.definitions) { + updated = true + } } for _, ptl := range b.perTargetListeners { - ptl.updateBridgeConfig(b.bindings.SiteId, &desired) + if ptl.updateBridgeConfig(b.bindings.SiteId, &desired) { + updated = true + } + } + if b.bindings.AddSslProfiles(config) { + updated = true + } + if config.UpdateBridgeConfig(desired) { + updated = true + } + if config.RemoveUnreferencedSslProfiles() { + updated = true } - b.bindings.AddSslProfiles(config) - config.UpdateBridgeConfig(desired) - config.RemoveUnreferencedSslProfiles() - return true //TODO: can optimise by indicating if no change was required + return updated } func (b *ExtendedBindings) AddSslProfiles(config *qdr.RouterConfig, definitions map[string]*skupperv2alpha1.AttachedConnector) bool { @@ -304,6 +323,17 @@ func (b *ExtendedBindings) checkAttachedConnectorBinding(namespace string, name if !ok { connector = NewAttachedConnector(name, namespace, b) b.connectors[name] = connector + } else if connector.binding != nil && binding != nil { + if connector.binding.Spec.ConnectorNamespace != binding.Spec.ConnectorNamespace { + b.logger.Info("AttachedConnectorBinding connector namespace has changed", + slog.String("key", fmt.Sprintf("%s/%s", namespace, name)), + slog.String("from", connector.binding.Spec.ConnectorNamespace), + slog.String("to", binding.Spec.ConnectorNamespace), + ) + connector.unbind() + connector = NewAttachedConnector(name, namespace, b) + b.connectors[name] = connector + } } if (binding == nil && connector.bindingDeleted()) || (binding != nil && connector.bindingUpdated(binding)) { if b.site != nil { @@ -348,6 +378,16 @@ func (b *ExtendedBindings) attachedConnectorDeleted(namespace string, name strin return nil } +func (b *ExtendedBindings) attachedConnectorUnreferenced(namespace string, name string) error { + if connector, ok := b.connectors[name]; ok && connector.definitionDeleted(namespace) { + delete(b.connectors, name) + if err := connector.Updated(nil); err != nil { + return err + } + } + return nil +} + func (b *ExtendedBindings) networkUpdated(network []skupperv2alpha1.SiteRecord) qdr.ConfigUpdate { changed := false for _, ptl := range b.perTargetListeners { diff --git a/internal/kube/site/per_target_listener.go b/internal/kube/site/per_target_listener.go index fce46120e..6a851b230 100644 --- a/internal/kube/site/per_target_listener.go +++ b/internal/kube/site/per_target_listener.go @@ -125,18 +125,22 @@ func (p *PerTargetListener) unexpose(target string, mapping *qdr.PortMapping, ex return nil } -func (p *PerTargetListener) updateBridgeConfig(siteId string, config *qdr.BridgeConfig) { +func (p *PerTargetListener) updateBridgeConfig(siteId string, config *qdr.BridgeConfig) bool { + var updated bool for target, port := range p.targets { if p.definition.Spec.Type == "tcp" || p.definition.Spec.Type == "" { - config.AddTcpListener(qdr.TcpEndpoint{ + if config.AddTcpListener(qdr.TcpEndpoint{ Name: p.definition.Name + "@" + target, SiteId: siteId, Port: strconv.Itoa(port), Address: p.address(target), SslProfile: p.definition.Spec.TlsCredentials, - }) + }) { + updated = true + } } } + return updated } func extractTargets(prefix string, network []skupperv2alpha1.SiteRecord) []string { diff --git a/internal/kube/site/site.go b/internal/kube/site/site.go index 4738187c2..697534afa 100644 --- a/internal/kube/site/site.go +++ b/internal/kube/site/site.go @@ -1342,8 +1342,12 @@ func (s *Site) AttachedConnectorUpdated(connector *skupperv2alpha1.AttachedConne return s.bindings.attachedConnectorUpdated(connector.Name, connector) } +func (s *Site) AttachedConnectorUnreferenced(connector *skupperv2alpha1.AttachedConnector) error { + return s.bindings.attachedConnectorUnreferenced(connector.Namespace, connector.Name) +} + func (s *Site) AttachedConnectorDeleted(namespace string, name string) error { - return s.bindings.attachedConnectorDeleted(name, namespace) + return s.bindings.attachedConnectorDeleted(namespace, name) } func (s *Site) GetSite() *skupperv2alpha1.Site { diff --git a/internal/kube/watchers/resources.go b/internal/kube/watchers/resources.go index 19bcdf809..f3f4f80e9 100644 --- a/internal/kube/watchers/resources.go +++ b/internal/kube/watchers/resources.go @@ -145,3 +145,7 @@ func (w ResourceWatcher[T]) HasSynced() func() bool { func (w ResourceWatcher[T]) Sync(stopCh <-chan struct{}) bool { return cache.WaitForCacheSync(stopCh, w.informer.HasSynced) } + +func (w ResourceWatcher[T]) IsStopped() bool { + return w.informer.IsStopped() +} diff --git a/internal/kube/watchers/watchers.go b/internal/kube/watchers/watchers.go index 2719e068f..09c645791 100644 --- a/internal/kube/watchers/watchers.go +++ b/internal/kube/watchers/watchers.go @@ -78,12 +78,15 @@ type EventProcessor struct { skupperClient skupperclient.Interface queue workqueue.RateLimitingInterface resync time.Duration + resyncShort time.Duration watchers []Watcher } +type EventProcessorCustomizer func(e *EventProcessor) + // Creates a properly initialised EventProcessor instance. -func NewEventProcessor(name string, clients internalclient.Clients) *EventProcessor { - return &EventProcessor{ +func NewEventProcessor(name string, clients internalclient.Clients, options ...EventProcessorCustomizer) *EventProcessor { + e := &EventProcessor{ errorKey: name + "Error", client: clients.GetKubeClient(), routeClient: clients.GetRouteInterface(), @@ -92,7 +95,20 @@ func NewEventProcessor(name string, clients internalclient.Clients) *EventProces skupperClient: clients.GetSkupperClient(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), resync: time.Minute * 5, + resyncShort: time.Second * 30, + } + for _, opt := range options { + opt(e) } + return e +} + +func (c *EventProcessor) SetResync(resync time.Duration) { + c.resync = resync +} + +func (c *EventProcessor) SetResyncShort(resync time.Duration) { + c.resyncShort = resync } func (c *EventProcessor) GetKubeClient() kubernetes.Interface { @@ -139,6 +155,10 @@ func (c *EventProcessor) Start(stopCh <-chan struct{}) { go wait.Until(c.run, time.Second, stopCh) } +func (c *EventProcessor) GetWatchers() []Watcher { + return c.watchers +} + func (c *EventProcessor) run() { for c.process() { } @@ -396,7 +416,7 @@ func (c *EventProcessor) WatchSites(namespace string, handler SiteHandler) *Site informer := skupperv2alpha1informer.NewSiteInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -405,7 +425,7 @@ func (c *EventProcessor) WatchListeners(namespace string, handler ListenerHandle informer := skupperv2alpha1informer.NewListenerInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -414,7 +434,7 @@ func (c *EventProcessor) WatchConnectors(namespace string, handler ConnectorHand informer := skupperv2alpha1informer.NewConnectorInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -423,7 +443,7 @@ func (c *EventProcessor) WatchLinks(namespace string, handler LinkHandler) *Link informer := skupperv2alpha1informer.NewLinkInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -432,7 +452,7 @@ func (c *EventProcessor) WatchAccessTokens(namespace string, handler AccessToken informer := skupperv2alpha1informer.NewAccessTokenInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -441,7 +461,7 @@ func (c *EventProcessor) WatchAccessGrants(namespace string, handler AccessGrant informer := skupperv2alpha1informer.NewAccessGrantInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -450,7 +470,7 @@ func (c *EventProcessor) WatchSecuredAccesses(namespace string, handler SecuredA informer := skupperv2alpha1informer.NewSecuredAccessInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -459,7 +479,7 @@ func (c *EventProcessor) WatchSecuredAccessesWithOptions(options skupperv2alpha1 informer := skupperv2alpha1informer.NewFilteredSecuredAccessInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, options) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) @@ -469,7 +489,7 @@ func (c *EventProcessor) WatchCertificates(namespace string, handler Certificate informer := skupperv2alpha1informer.NewCertificateInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -478,7 +498,7 @@ func (c *EventProcessor) WatchRouterAccesses(namespace string, handler RouterAcc informer := skupperv2alpha1informer.NewRouterAccessInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -487,7 +507,7 @@ func (c *EventProcessor) WatchAttachedConnectorBindings(namespace string, handle informer := skupperv2alpha1informer.NewAttachedConnectorBindingInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } @@ -496,7 +516,7 @@ func (c *EventProcessor) WatchAttachedConnectors(namespace string, handler Attac informer := skupperv2alpha1informer.NewAttachedConnectorInformer( c.skupperClient, namespace, - time.Second*30, + c.resyncShort, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } diff --git a/internal/qdr/qdr.go b/internal/qdr/qdr.go index fa0c45456..6d28ff702 100644 --- a/internal/qdr/qdr.go +++ b/internal/qdr/qdr.go @@ -238,8 +238,15 @@ func (r *RouterConfig) SetSiteMetadata(site *SiteMetadata) { r.Metadata.Metadata = getSiteMetadataString(site.Id, site.Version) } -func (bc *BridgeConfig) AddTcpConnector(e TcpEndpoint) { +func (bc *BridgeConfig) AddTcpConnector(e TcpEndpoint) bool { + var updated = true + if existing, ok := bc.TcpConnectors[e.Name]; ok { + if e == existing { + updated = false + } + } bc.TcpConnectors[e.Name] = e + return updated } func (bc *BridgeConfig) RemoveTcpConnector(name string) (bool, TcpEndpoint) { @@ -252,8 +259,15 @@ func (bc *BridgeConfig) RemoveTcpConnector(name string) (bool, TcpEndpoint) { } } -func (bc *BridgeConfig) AddTcpListener(e TcpEndpoint) { +func (bc *BridgeConfig) AddTcpListener(e TcpEndpoint) bool { + var updated = true + if existing, ok := bc.TcpListeners[e.Name]; ok { + if e == existing { + updated = false + } + } bc.TcpListeners[e.Name] = e + return updated } func (bc *BridgeConfig) RemoveTcpListener(name string) (bool, TcpEndpoint) { diff --git a/internal/site/connector.go b/internal/site/connector.go index 12a1c8804..f6709f897 100644 --- a/internal/site/connector.go +++ b/internal/site/connector.go @@ -13,16 +13,22 @@ func UpdateBridgeConfigForConnector(siteId string, connector *skupperv2alpha1.Co } } -func UpdateBridgeConfigForConnectorToPod(siteId string, connector *skupperv2alpha1.Connector, pod skupperv2alpha1.PodDetails, addQualifiedAddress bool, config *qdr.BridgeConfig) { - updateBridgeConfigForConnector(connector.Name+"@"+pod.IP, siteId, connector, pod.IP, pod.UID, connector.Spec.RoutingKey, config) +func UpdateBridgeConfigForConnectorToPod(siteId string, connector *skupperv2alpha1.Connector, pod skupperv2alpha1.PodDetails, addQualifiedAddress bool, config *qdr.BridgeConfig) bool { + updated := false + if updateBridgeConfigForConnector(connector.Name+"@"+pod.IP, siteId, connector, pod.IP, pod.UID, connector.Spec.RoutingKey, config) { + updated = true + } if addQualifiedAddress { - updateBridgeConfigForConnector(connector.Name+"@"+pod.Name, siteId, connector, pod.IP, pod.UID, connector.Spec.RoutingKey+"."+pod.Name, config) + if updateBridgeConfigForConnector(connector.Name+"@"+pod.Name, siteId, connector, pod.IP, pod.UID, connector.Spec.RoutingKey+"."+pod.Name, config) { + updated = true + } } + return updated } -func updateBridgeConfigForConnector(name string, siteId string, connector *skupperv2alpha1.Connector, host string, processID string, address string, config *qdr.BridgeConfig) { +func updateBridgeConfigForConnector(name string, siteId string, connector *skupperv2alpha1.Connector, host string, processID string, address string, config *qdr.BridgeConfig) bool { if connector.Spec.Type == "tcp" || connector.Spec.Type == "" { - config.AddTcpConnector(qdr.TcpEndpoint{ + return config.AddTcpConnector(qdr.TcpEndpoint{ Name: name, SiteId: siteId, Host: host, @@ -33,6 +39,7 @@ func updateBridgeConfigForConnector(name string, siteId string, connector *skupp VerifyHostname: getVerifyHostname(connector), }) } + return false } func GetSslProfileName(tlsCredentials string, useClientCert bool) string {