Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion internal/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ func labelling() internalinterfaces.TweakListOptionsFunc {
}
}

var eventProcessorCustomizers []watchers.EventProcessorCustomizer

func NewController(cli internalclient.Clients, config *Config) (*Controller, error) {
controller := &Controller{
eventProcessor: watchers.NewEventProcessor("Controller", cli),
eventProcessor: watchers.NewEventProcessor("Controller", cli, eventProcessorCustomizers...),
sites: map[string]*site.Site{},
siteSizing: sizing.NewRegistry(),
labelling: labels.NewLabelsAndAnnotations(config.Namespace),
Expand Down Expand Up @@ -448,12 +450,29 @@ func (c *Controller) checkAttachedConnectorBinding(key string, binding *skupperv
func (c *Controller) checkAttachedConnector(key string, connector *skupperv2alpha1.AttachedConnector) error {
if connector == nil {
if previous, ok := c.attachableConnectors[key]; ok {
c.log.Info("AttachedConnector deleted", slog.String("key", key))
delete(c.attachableConnectors, key)
return c.getSite(previous.Spec.SiteNamespace).AttachedConnectorDeleted(previous.Namespace, previous.Name)
} else {
return nil
}
} else {
if previous, ok := c.attachableConnectors[key]; ok {
if previous.Spec.SiteNamespace != connector.Spec.SiteNamespace {
c.log.Info("AttachedConnector site namespace has changed",
slog.String("key", key),
slog.String("from", previous.Spec.SiteNamespace),
slog.String("to", connector.Spec.SiteNamespace),
)
err := c.getSite(previous.Spec.SiteNamespace).AttachedConnectorUnreferenced(previous)
if err != nil {
c.log.Error("Error removing AttachedConnector reference from previous namespace",
slog.String("key", key),
slog.String("previous", previous.Spec.SiteNamespace))
}
}
}
c.attachableConnectors[key] = connector
return c.getSite(connector.Spec.SiteNamespace).AttachedConnectorUpdated(connector)
}
}
Expand Down
228 changes: 227 additions & 1 deletion internal/kube/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/skupperproject/skupper/internal/kube/watchers"
"gotest.tools/v3/assert"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -37,6 +39,7 @@ import (
)

type WaitFunction func(t *testing.T, clients internalclient.Clients) bool
type ControllerFunction func(t *testing.T, controller *Controller) bool

func TestGeneral(t *testing.T) {
fakeNetworkStatus := f.fakeNetworkStatusInfo("test")
Expand Down Expand Up @@ -605,6 +608,7 @@ func TestUpdate(t *testing.T) {
k8sObjects []runtime.Object
skupperObjects []runtime.Object
functions []WaitFunction
postFunctions []ControllerFunction
}{
{
name: "change listener host",
Expand Down Expand Up @@ -640,8 +644,126 @@ func TestUpdate(t *testing.T) {
deleteTargetPod("mypod-1", "test"),
serviceCheck("mypod-1", "test").checkAbsent,
},
}, {
name: "unreferenced attached connector",
skupperObjects: []runtime.Object{
f.site("mysite", "test", "", false, false),
f.attachedConnectorBinding("myconnector", "test", "test2"),
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
f.listener("mylistener", "test", "mysvc", 8080),
},
k8sObjects: []runtime.Object{
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
},
functions: []WaitFunction{
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
serviceCheck("mysvc", "test").check,
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
updateAttachedConnectorSiteNamespace("myconnector", "test2", "test3", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
},
postFunctions: []ControllerFunction{
podWatchers(1, 1),
},
}, {
name: "deleted attached connector",
skupperObjects: []runtime.Object{
f.site("mysite", "test", "", false, false),
f.attachedConnectorBinding("myconnector", "test", "test2"),
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
f.listener("mylistener", "test", "mysvc", 8080),
},
k8sObjects: []runtime.Object{
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
},
functions: []WaitFunction{
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
serviceCheck("mysvc", "test").check,
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
deleteAttachedConnector("myconnector", "test2"),
},
postFunctions: []ControllerFunction{
podWatchers(1, 1),
},
}, {
name: "unreferenced attached connector binding",
skupperObjects: []runtime.Object{
f.site("mysite", "test", "", false, false),
f.attachedConnectorBinding("myconnector", "test", "test2"),
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
f.listener("mylistener", "test", "mysvc", 8080),
},
k8sObjects: []runtime.Object{
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
},
functions: []WaitFunction{
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
serviceCheck("mysvc", "test").check,
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue),
updateAttachedConnectorBindingConnectorNamespace("myconnector", "test", "test3", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
},
postFunctions: []ControllerFunction{
podWatchers(1, 1),
},
}, {
name: "deleted attached connector binding",
skupperObjects: []runtime.Object{
f.site("mysite", "test", "", false, false),
f.attachedConnectorBinding("myconnector", "test", "test2"),
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
f.listener("mylistener", "test", "mysvc", 8080),
},
k8sObjects: []runtime.Object{
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
},
functions: []WaitFunction{
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
serviceCheck("mysvc", "test").check,
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue),
deleteAttachedConnectorBinding("myconnector", "test"),
isAttachedConnectorStatusCondition("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionFalse),
},
postFunctions: []ControllerFunction{
podWatchers(1, 1),
},
}, {
name: "site deleted",
skupperObjects: []runtime.Object{
f.addUID(f.site("mysite", "test", "", false, false), "49b03ad4-d414-42be-bbb5-b32d7d4ca503"),
f.attachedConnectorBinding("myconnector", "test", "test2"),
f.attachedConnector("myconnector", "test2", "test", "app=foo", 8080),
f.listener("mylistener", "test", "mysvc", 8080),
},
k8sObjects: []runtime.Object{
f.pod("foo", "test2", map[string]string{"app": "foo"}, nil,
f.podStatus("10.1.1.10", corev1.PodRunning, f.podCondition(corev1.PodReady, corev1.ConditionTrue))),
},
functions: []WaitFunction{
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
serviceCheck("mysvc", "test").check,
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorStatusConditionTrue("myconnector", "test2", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
isAttachedConnectorBindingStatusCondition("myconnector", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED, metav1.ConditionTrue),
deleteSite("mysite", "test"),
},
postFunctions: []ControllerFunction{
podWatchers(1, 1),
},
},
}
defer func() {
eventProcessorCustomizers = nil
}()
for _, tt := range testTable {
t.Run(tt.name, func(t *testing.T) {
flags := &flag.FlagSet{}
Expand All @@ -654,6 +776,11 @@ func TestUpdate(t *testing.T) {
clients, err := fakeclient.NewFakeClient(config.Namespace, tt.k8sObjects, tt.skupperObjects, "")
assert.Assert(t, err)
enableSSA(clients.GetDynamicClient())
eventProcessorCustomizers = []watchers.EventProcessorCustomizer{
func(e *watchers.EventProcessor) {
e.SetResyncShort(time.Second)
},
}
controller, err := NewController(clients, config)
assert.Assert(t, err)
stopCh := make(chan struct{})
Expand All @@ -668,10 +795,31 @@ func TestUpdate(t *testing.T) {
controller.eventProcessor.TestProcess()
}
}
for _, f := range tt.postFunctions {
for !f(t, controller) {
controller.eventProcessor.TestProcess()
}
}
})
}
}

func deleteAttachedConnector(name string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
assert.Assert(t, err)
return true
}
}

func deleteAttachedConnectorBinding(name string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
assert.Assert(t, err)
return true
}
}

func verifyStatus(t *testing.T, expected skupperv2alpha1.Status, actual skupperv2alpha1.Status) {
t.Helper()
assert.Equal(t, expected.StatusType, actual.StatusType, actual.Message)
Expand Down Expand Up @@ -1643,11 +1791,89 @@ func isListenerStatusConditionTrue(name string, namespace string, condition stri
}
}

func updateAttachedConnectorSiteNamespace(name string, namespace string, siteNamespace string, condition string, conditionStatus metav1.ConditionStatus) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
cli := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace)
connector, err := cli.Get(context.Background(), name, metav1.GetOptions{})
assert.Assert(t, err)
if connector.Spec.SiteNamespace != siteNamespace {
t.Logf("updating siteNamespace")
connector.Spec.SiteNamespace = siteNamespace
_, err = cli.Update(context.Background(), connector, metav1.UpdateOptions{})
assert.Assert(t, err)
return false
}
return isConditionUpToDate(connector.Status.Conditions, condition, connector.ObjectMeta.Generation) && meta.IsStatusConditionPresentAndEqual(connector.Status.Conditions, condition, conditionStatus)
}
}

func updateAttachedConnectorBindingConnectorNamespace(name string, namespace string, connectorNamespace string, condition string, status metav1.ConditionStatus) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
cli := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace)
binding, err := cli.Get(context.Background(), name, metav1.GetOptions{})
assert.Assert(t, err)
if binding.Spec.ConnectorNamespace != connectorNamespace {
t.Logf("updating connectorNamespace")
binding.Spec.ConnectorNamespace = connectorNamespace
_, err = cli.Update(context.Background(), binding, metav1.UpdateOptions{})
assert.Assert(t, err)
return false
}
return isConditionUpToDate(binding.Status.Conditions, condition, binding.ObjectMeta.Generation) && meta.IsStatusConditionPresentAndEqual(binding.Status.Conditions, condition, status)
}
}

func isAttachedConnectorStatusConditionTrue(name string, namespace string, condition string) WaitFunction {
return isAttachedConnectorStatusCondition(name, namespace, condition, metav1.ConditionTrue)
}

func isAttachedConnectorStatusCondition(name string, namespace string, condition string, status metav1.ConditionStatus) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
connector, err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Get(context.Background(), name, metav1.GetOptions{})
assert.Assert(t, err)
return meta.IsStatusConditionTrue(connector.Status.Conditions, condition)
return meta.IsStatusConditionPresentAndEqual(connector.Status.Conditions, condition, status)
}
}

func isAttachedConnectorBindingStatusCondition(name string, namespace string, condition string, status metav1.ConditionStatus) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
binding, err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectorBindings(namespace).Get(context.Background(), name, metav1.GetOptions{})
assert.Assert(t, err)
return meta.IsStatusConditionPresentAndEqual(binding.Status.Conditions, condition, status)
}
}

func podWatchers(expectedRunning int, expectedStopped int) ControllerFunction {
return func(t *testing.T, controller *Controller) bool {
for {
var running int
var stopped int
for _, w := range controller.eventProcessor.GetWatchers() {
if rw, ok := w.(*watchers.ResourceWatcher[*corev1.Pod]); ok {
if rw.IsStopped() {
stopped++
} else {
running++
}
}
}
if expectedRunning == running && expectedStopped == stopped {
return true
}
t.Logf("PodWatchers count do not match - Expected Running %d/%d - Expected Stopped %d/%d", running, expectedRunning, stopped, expectedStopped)
return false
}
}
}

func deleteSite(name string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
site, err := clients.GetSkupperClient().SkupperV2alpha1().Sites(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err == nil && site != nil {
_ = clients.GetSkupperClient().SkupperV2alpha1().Sites(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
return false
}
return true
}
}

Expand Down
Loading