From 6f5b0a4e8ee590d31277fbf3ec2a115b8dc91676 Mon Sep 17 00:00:00 2001 From: Aritra Basu Date: Wed, 1 Oct 2025 14:57:06 -0700 Subject: [PATCH] Remove bgp_conf_watcher dependency on pubsub Signed-off-by: Aritra Basu --- calico-vpp-agent/cmd/calico_vpp_dataplane.go | 2 +- calico-vpp-agent/common/pubsub.go | 1 - calico-vpp-agent/felix/felix_server.go | 11 +- .../watchers/bgp_configuration_watcher.go | 120 +++++++++++++----- 4 files changed, 99 insertions(+), 35 deletions(-) diff --git a/calico-vpp-agent/cmd/calico_vpp_dataplane.go b/calico-vpp-agent/cmd/calico_vpp_dataplane.go index bae4d9c2e..37b8e39ba 100644 --- a/calico-vpp-agent/cmd/calico_vpp_dataplane.go +++ b/calico-vpp-agent/cmd/calico_vpp_dataplane.go @@ -134,7 +134,6 @@ func main() { */ routeWatcher := watchers.NewRouteWatcher(log.WithFields(logrus.Fields{"subcomponent": "host-route-watcher"})) linkWatcher := watchers.NewLinkWatcher(common.VppManagerInfo.UplinkStatuses, log.WithFields(logrus.Fields{"subcomponent": "host-link-watcher"})) - bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"})) prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"})) peerWatcher := watchers.NewPeerWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "peer-watcher"})) bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"})) @@ -143,6 +142,7 @@ func main() { prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"})) localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"})) felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"})) + bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"}), felixServer.HandleBGPConfigurationChange) felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"})) cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"})) serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"})) diff --git a/calico-vpp-agent/common/pubsub.go b/calico-vpp-agent/common/pubsub.go index 2ac7b253e..8e524bec9 100644 --- a/calico-vpp-agent/common/pubsub.go +++ b/calico-vpp-agent/common/pubsub.go @@ -28,7 +28,6 @@ const ( PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged" IpamConfChanged CalicoVppEventType = "IpamConfChanged" - BGPConfChanged CalicoVppEventType = "BGPConfChanged" ConnectivityAdded CalicoVppEventType = "ConnectivityAdded" ConnectivityDeleted CalicoVppEventType = "ConnectivityDeleted" diff --git a/calico-vpp-agent/felix/felix_server.go b/calico-vpp-agent/felix/felix_server.go index e13fc1287..a2dfcd53c 100644 --- a/calico-vpp-agent/felix/felix_server.go +++ b/calico-vpp-agent/felix/felix_server.go @@ -108,6 +108,14 @@ func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) { s.cache.BGPConf = bgpConf } +// HandleBGPConfigurationChange is called when the BGPConfiguration changes. +// Handling of BGPConfiguration updates is not yet implemented, instead, +// we log and trigger a restart to ensure the system reloads configuration. +func (s *Server) HandleBGPConfigurationChange() error { + s.log.Error("BGPConf updated") + return errors.Errorf("BGPConf updated, restarting") +} + func (s *Server) getMainInterface() *config.UplinkStatus { for _, i := range common.VppManagerInfo.UplinkStatuses { if i.IsMain { @@ -294,9 +302,6 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) { s.log.Debugf("Ignoring NamespaceRemove") case *proto.GlobalBGPConfigUpdate: s.log.Infof("Got GlobalBGPConfigUpdate") - common.SendEvent(common.CalicoVppEvent{ - Type: common.BGPConfChanged, - }) case *proto.WireguardEndpointUpdate: err = s.connectivityHandler.OnWireguardEndpointUpdate(evt) case *proto.WireguardEndpointRemove: diff --git a/calico-vpp-agent/watchers/bgp_configuration_watcher.go b/calico-vpp-agent/watchers/bgp_configuration_watcher.go index c6996a82f..b22fe790b 100644 --- a/calico-vpp-agent/watchers/bgp_configuration_watcher.go +++ b/calico-vpp-agent/watchers/bgp_configuration_watcher.go @@ -24,29 +24,31 @@ import ( calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3" calicoerr "github.com/projectcalico/calico/libcalico-go/lib/errors" "github.com/projectcalico/calico/libcalico-go/lib/options" + "github.com/projectcalico/calico/libcalico-go/lib/watch" "github.com/sirupsen/logrus" "golang.org/x/net/context" "gopkg.in/tomb.v2" - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" "github.com/projectcalico/vpp-dataplane/v3/config" ) type BGPConfigurationWatcher struct { - log *logrus.Entry - clientv3 calicov3cli.Interface - BGPConfigurationWatcherEventChan chan any - BGPConf *calicov3.BGPConfigurationSpec + log *logrus.Entry + clientv3 calicov3cli.Interface + BGPConf *calicov3.BGPConfigurationSpec + // Watch interface for monitoring BGP configuration changes + watcher watch.Interface + currentWatchRevision string + // Callback function to handle BGP configuration changes + onBGPConfigChanged func() error } -func NewBGPConfigurationWatcher(clientv3 calicov3cli.Interface, log *logrus.Entry) *BGPConfigurationWatcher { +func NewBGPConfigurationWatcher(clientv3 calicov3cli.Interface, log *logrus.Entry, configChangeHandler func() error) *BGPConfigurationWatcher { w := BGPConfigurationWatcher{ - log: log, - clientv3: clientv3, - BGPConfigurationWatcherEventChan: make(chan any, common.ChanSize), + log: log, + clientv3: clientv3, + onBGPConfigChanged: configChangeHandler, } - reg := common.RegisterHandler(w.BGPConfigurationWatcherEventChan, "BGP Config watcher events") - reg.ExpectEvents(common.BGPConfChanged) return &w } @@ -126,31 +128,89 @@ func (w *BGPConfigurationWatcher) getDefaultBGPConfig() (*calicov3.BGPConfigurat } } +// WatchBGPConfiguration watches for changes in BGP configuration using Calico API func (w *BGPConfigurationWatcher) WatchBGPConfiguration(t *tomb.Tomb) error { + w.log.Info("BGP configuration watcher started") for t.Alive() { - select { - case <-t.Dying(): - w.log.Warn("BGPConf watcher stopped") - return nil - case msg := <-w.BGPConfigurationWatcherEventChan: - evt, ok := msg.(common.CalicoVppEvent) - if !ok { - continue - } - switch evt.Type { - case common.BGPConfChanged: - oldBGPConf := w.BGPConf - newBGPConf, err := w.GetBGPConf() - if err != nil { - return errors.Wrap(err, "error getting BGP configuration") + w.currentWatchRevision = "" + err := w.resyncAndCreateWatcher() + if err != nil { + w.log.WithError(err).Error("Failed to create BGP configuration watcher") + goto restart + } + for { + select { + case <-t.Dying(): + w.log.Info("BGP configuration watcher asked to stop") + w.cleanExistingWatcher() + return nil + case event, ok := <-w.watcher.ResultChan(): + if !ok { + w.log.Debug("BGP configuration watcher closed, restarting...") + goto restart } - if !reflect.DeepEqual(newBGPConf, oldBGPConf) { - w.log.Error("BGPConf updated") - return errors.Errorf("BGPConf updated, restarting") + w.currentWatchRevision = event.Object.(*calicov3.BGPConfiguration).GetResourceVersion() + switch event.Type { + case watch.Error: + w.log.Debug("BGP configuration watch returned error, restarting...") + goto restart + case watch.Added, watch.Modified: + w.handleBGPConfigurationUpdate() + case watch.Deleted: + w.log.Debug("BGP configuration deleted, using defaults") + w.handleBGPConfigurationUpdate() } - default: } } + restart: + w.cleanExistingWatcher() + w.log.Debug("Restarting BGP configuration watcher...") + } + return nil +} + +// resyncAndCreateWatcher creates a new watcher for BGP configurations +func (w *BGPConfigurationWatcher) resyncAndCreateWatcher() error { + w.cleanExistingWatcher() + + opts := options.ListOptions{ + ResourceVersion: w.currentWatchRevision, + } + + watcher, err := w.clientv3.BGPConfigurations().Watch(context.Background(), opts) + if err != nil { + return errors.Wrap(err, "failed to create BGP configuration watcher") } + w.watcher = watcher return nil } + +// cleanExistingWatcher closes the existing watcher if it exists +func (w *BGPConfigurationWatcher) cleanExistingWatcher() { + if w.watcher != nil { + w.watcher.Stop() + w.watcher = nil + } +} + +// handleBGPConfigurationUpdate handles BGP configuration update events +func (w *BGPConfigurationWatcher) handleBGPConfigurationUpdate() { + if w.onBGPConfigChanged == nil { + w.log.Debug("No BGP configuration change handler set") + return + } + + oldConf := w.BGPConf + newConf, err := w.GetBGPConf() + if err != nil { + w.log.WithError(err).Error("Failed to get updated BGP configuration") + return + } + + // Only call the callback if the config actually changed + if !reflect.DeepEqual(oldConf, newConf) { + if err := w.onBGPConfigChanged(); err != nil { + w.log.WithError(err).Error("BGP configuration change handler failed") + } + } +}