Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions images/virtualization-dra/cmd/usb-gateway/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package app

import (
"fmt"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/component-base/cli/flag"

"github.com/deckhouse/virtualization-dra/internal/controller/resourceclaim"
"github.com/deckhouse/virtualization-dra/internal/informer"
"github.com/deckhouse/virtualization-dra/pkg/logger"
)

func NewUSBGatewayCommand() *cobra.Command {
o := &usbOptions{}

cmd := &cobra.Command{
Use: "usb-gateway",
Short: "USB gateway",
Long: "USB gateway",
SilenceUsage: true,
SilenceErrors: true,
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := o.Validate(); err != nil {
return err
}
log := o.Logging.Complete()
logger.SetDefaultLogger(log)
return nil
},
RunE: o.Run,
}

fs := cmd.Flags()
for _, f := range o.NamedFlags().FlagSets {
fs.AddFlagSet(f)
}

return cmd
}

type usbOptions struct {
Kubeconfig string
NodeName string
Logging *logger.Options
}

func (o *usbOptions) NamedFlags() (fs flag.NamedFlagSets) {
mfs := fs.FlagSet("usb-gateway")
mfs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path to kubeconfig file")
mfs.StringVar(&o.NodeName, "node-name", o.NodeName, "Node name")

o.Logging.AddFlags(fs.FlagSet("logging"))

return fs
}

func (o *usbOptions) Validate() error {
if o.NodeName == "" {
return fmt.Errorf("NodeName is required")
}

return nil
}

func (o *usbOptions) Run(cmd *cobra.Command, _ []string) error {
cfg, err := clientcmd.BuildConfigFromFlags("", o.Kubeconfig)
if err != nil {
return fmt.Errorf("failed to get rest config: %w", err)
}

client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

f := informer.NewFactory(client, nil)
resourceClaimInformer := f.ResourceClaim()

f.Start(cmd.Context().Done())
f.WaitForCacheSync(cmd.Context().Done())

c, err := resourceclaim.NewController(resourceClaimInformer)
if err != nil {
return fmt.Errorf("failed to create resourceclaim controller: %w", err)
}

group, ctx := errgroup.WithContext(cmd.Context())
group.Go(func() error {
return c.Run(ctx, 1)
})

return group.Wait()
}
20 changes: 20 additions & 0 deletions images/virtualization-dra/cmd/usb-gateway/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"

"github.com/deckhouse/virtualization-dra/cmd/usb-gateway/app"
)

func main() {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

if err := app.NewUSBGatewayCommand().ExecuteContext(ctx); err != nil {
slog.Error("failed to execute command", slog.Any("err", err))
os.Exit(1)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package resourceclaim

import (
"context"
"fmt"
"log/slog"
"time"

resourcev1beta1 "k8s.io/api/resource/v1beta1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const controllerName = "resourceclaim-controller"

var (
keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)

type Controller struct {
resourceClaimIndexer cache.Indexer
queue workqueue.TypedRateLimitingInterface[string]
log *slog.Logger
hasSynced cache.InformerSynced
}

func NewController(resourceClaimInformer cache.SharedIndexInformer) (*Controller, error) {
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "resourceclaim-controller"},
)
log := slog.With(slog.String("controller", controllerName))

c := &Controller{
resourceClaimIndexer: resourceClaimInformer.GetIndexer(),
queue: queue,
log: log,
}

_, err := resourceClaimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addResourceClaim,
UpdateFunc: c.updateResourceClaim,
DeleteFunc: c.deleteResourceClaim,
})
if err != nil {
return nil, fmt.Errorf("unable to add event handler to resourceclaim informer: %w", err)
}

c.hasSynced = resourceClaimInformer.HasSynced

return c, nil
}

func (c *Controller) addResourceClaim(obj interface{}) {
if rc, ok := obj.(*resourcev1beta1.ResourceClaim); ok {
c.enqueueResourceClaim(rc)
}
}

func (c *Controller) deleteResourceClaim(obj interface{}) {
if rc, ok := obj.(*resourcev1beta1.ResourceClaim); ok {
c.enqueueResourceClaim(rc)
}
}

func (c *Controller) updateResourceClaim(oldObj, newObj interface{}) {
oldRC, ok := oldObj.(*resourcev1beta1.ResourceClaim)
if !ok {
return
}
newRC, ok := newObj.(*resourcev1beta1.ResourceClaim)
if !ok {
return
}

if oldRC.Status.Allocation == nil {
c.enqueueResourceClaim(newRC)
}
}

func (c *Controller) enqueueResourceClaim(rc *resourcev1beta1.ResourceClaim) {
key, err := keyFunc(rc)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", rc, err))
return
}
c.queueAdd(key)
}

func (c *Controller) queueAdd(key string) {
c.queue.Add(key)
}

func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

c.log.Info("Starting controller")
defer c.log.Info("Shutting down controller")

if !cache.WaitForCacheSync(ctx.Done(), c.hasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}

c.log.Info("Starting workers controller")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.worker, time.Second)
}

<-ctx.Done()
return nil
}

func (c *Controller) worker(ctx context.Context) {
workFunc := func(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return true
}
defer c.queue.Done(key)

if err := c.sync(key); err != nil {
c.log.Error("re-enqueuing", slog.String("key", key), slog.Any("err", err))
c.queue.AddRateLimited(key)
} else {
c.log.Info(fmt.Sprintf("processed ResourceClaim %v", key))
c.queue.Forget(key)
}
return false
}
for {
quit := workFunc(ctx)

if quit {
return
}
}
}

func (c *Controller) sync(key string) error {
return nil
}
95 changes: 95 additions & 0 deletions images/virtualization-dra/internal/informer/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package informer

import (
"log/slog"
"math/rand/v2"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
resourcev1beta1 "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func NewFactory(clientSet *kubernetes.Clientset, resync *time.Duration) *Factory {
var defaultResync time.Duration
if resync != nil {
defaultResync = *resync
} else {
defaultResync = resyncPeriod(12 * time.Hour)
}

return &Factory{
clientSet: clientSet,
defaultResync: defaultResync,
informers: make(map[string]cache.SharedIndexInformer),
}
}

type Factory struct {
clientSet *kubernetes.Clientset
defaultResync time.Duration

informers map[string]cache.SharedIndexInformer
startedInformers map[string]struct{}
mu sync.Mutex
}

func (f *Factory) Start(stopCh <-chan struct{}) {
f.mu.Lock()
defer f.mu.Unlock()

for name, informer := range f.informers {
if _, found := f.startedInformers[name]; found {
// skip informers that have already started.
slog.Info("SKIPPING informer", slog.String("name", name))
continue
}
slog.Info("STARTING informer", slog.String("name", name))
go informer.Run(stopCh)
f.startedInformers[name] = struct{}{}
}
}

func (f *Factory) WaitForCacheSync(stopCh <-chan struct{}) {
var syncs []cache.InformerSynced

f.mu.Lock()
for name, informer := range f.informers {
slog.Info("Waiting for cache sync of informer", slog.String("name", name))
syncs = append(syncs, informer.HasSynced)
}
f.mu.Unlock()

cache.WaitForCacheSync(stopCh, syncs...)
}

func (f *Factory) ResourceClaim() cache.SharedIndexInformer {
return f.getInformer("resourceClaimInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.clientSet.ResourceV1beta1().RESTClient(), "resourceclaims", corev1.NamespaceAll, fields.Everything())
return cache.NewSharedIndexInformer(lw, &resourcev1beta1.ResourceClaim{}, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})
}

func (f *Factory) getInformer(key string, newFunc func() cache.SharedIndexInformer) cache.SharedIndexInformer {
f.mu.Lock()
defer f.mu.Unlock()

informer, ok := f.informers[key]
if ok {
return informer
}

informer = newFunc()
f.informers[key] = informer

return informer
}

// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod(minResyncPeriod time.Duration) time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
}
17 changes: 17 additions & 0 deletions images/virtualization-dra/internal/usbip/attacher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package usbip

func NewUSBAttacher() USBAttacher {
return &usbAttacher{}
}

type usbAttacher struct{}

func (a usbAttacher) Attach(busID string) error {
//TODO implement me
panic("implement me")
}

func (a usbAttacher) Detach(busID string) error {
//TODO implement me
panic("implement me")
}
Loading
Loading