From 2ea23ed23bfde439798a86e872d63ce1bd752ac7 Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Mon, 9 Jun 2025 13:00:39 -0400 Subject: [PATCH 1/7] adding PluginURL to allow for wasm processors --- api/v1alpha/conduit_types.go | 1 + charts/conduit-operator/templates/crd.yaml | 4 ++++ config/crd/bases/operator.conduit.io_conduits.yaml | 4 ++++ 3 files changed, 9 insertions(+) diff --git a/api/v1alpha/conduit_types.go b/api/v1alpha/conduit_types.go index 8971ac1..2528821 100644 --- a/api/v1alpha/conduit_types.go +++ b/api/v1alpha/conduit_types.go @@ -94,6 +94,7 @@ type ConduitProcessor struct { ID string `json:"id,omitempty"` Name string `json:"name,omitempty"` Plugin string `json:"plugin,omitempty"` + PluginURL string `json:"pluginURL,omitempty"` Condition string `json:"condition,omitempty"` Workers int `json:"workers,omitempty"` diff --git a/charts/conduit-operator/templates/crd.yaml b/charts/conduit-operator/templates/crd.yaml index ab0eb71..d3dd130 100644 --- a/charts/conduit-operator/templates/crd.yaml +++ b/charts/conduit-operator/templates/crd.yaml @@ -76,6 +76,8 @@ spec: type: string plugin: type: string + pluginURL: + type: string settings: items: properties: @@ -188,6 +190,8 @@ spec: type: string plugin: type: string + pluginURL: + type: string settings: items: properties: diff --git a/config/crd/bases/operator.conduit.io_conduits.yaml b/config/crd/bases/operator.conduit.io_conduits.yaml index 5c9c309..e60ce2c 100644 --- a/config/crd/bases/operator.conduit.io_conduits.yaml +++ b/config/crd/bases/operator.conduit.io_conduits.yaml @@ -65,6 +65,8 @@ spec: type: string plugin: type: string + pluginURL: + type: string settings: items: properties: @@ -177,6 +179,8 @@ spec: type: string plugin: type: string + pluginURL: + type: string settings: items: properties: From a24302da583254912651f7ad0c7fd92332e90cd2 Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Mon, 9 Jun 2025 15:20:31 -0400 Subject: [PATCH 2/7] renaming to processor url --- api/v1alpha/conduit_types.go | 12 ++++++------ charts/conduit-operator/templates/crd.yaml | 4 ++-- config/crd/bases/operator.conduit.io_conduits.yaml | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/api/v1alpha/conduit_types.go b/api/v1alpha/conduit_types.go index 2528821..26a4b36 100644 --- a/api/v1alpha/conduit_types.go +++ b/api/v1alpha/conduit_types.go @@ -91,12 +91,12 @@ type ConduitConnector struct { } type ConduitProcessor struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Plugin string `json:"plugin,omitempty"` - PluginURL string `json:"pluginURL,omitempty"` - Condition string `json:"condition,omitempty"` - Workers int `json:"workers,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Plugin string `json:"plugin,omitempty"` + ProcessorURL string `json:"processorURL,omitempty"` + Condition string `json:"condition,omitempty"` + Workers int `json:"workers,omitempty"` Settings []SettingsVar `json:"settings,omitempty"` } diff --git a/charts/conduit-operator/templates/crd.yaml b/charts/conduit-operator/templates/crd.yaml index d3dd130..347e47d 100644 --- a/charts/conduit-operator/templates/crd.yaml +++ b/charts/conduit-operator/templates/crd.yaml @@ -76,7 +76,7 @@ spec: type: string plugin: type: string - pluginURL: + processorURL: type: string settings: items: @@ -190,7 +190,7 @@ spec: type: string plugin: type: string - pluginURL: + processorURL: type: string settings: items: diff --git a/config/crd/bases/operator.conduit.io_conduits.yaml b/config/crd/bases/operator.conduit.io_conduits.yaml index e60ce2c..ab22a58 100644 --- a/config/crd/bases/operator.conduit.io_conduits.yaml +++ b/config/crd/bases/operator.conduit.io_conduits.yaml @@ -65,7 +65,7 @@ spec: type: string plugin: type: string - pluginURL: + processorURL: type: string settings: items: @@ -179,7 +179,7 @@ spec: type: string plugin: type: string - pluginURL: + processorURL: type: string settings: items: From 747124398654fb4a01c7a9d2cfd18ee918ce918c Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Wed, 11 Jun 2025 09:04:58 -0400 Subject: [PATCH 3/7] working deployment of custom processors --- internal/controller/conduit_containers.go | 103 +++++++++++++++++++--- 1 file changed, 89 insertions(+), 14 deletions(-) diff --git a/internal/controller/conduit_containers.go b/internal/controller/conduit_containers.go index 7053319..387988e 100644 --- a/internal/controller/conduit_containers.go +++ b/internal/controller/conduit_containers.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "log" "path" "path/filepath" "strings" @@ -17,12 +18,18 @@ const ( builderTempPath = "/tmp/connectors" ) -type commandBuilder struct { +type connectorCommandBuilder struct { sync.Mutex done map[string]bool builds []connectorBuild // ordered } +type processorCommandBuilder struct { + sync.Mutex + done map[string]bool + builds []processorBuild // ordered +} + type connectorBuild struct { name string goPkg string @@ -31,6 +38,12 @@ type connectorBuild struct { ldflags string } +type processorBuild struct { + targetDir string + procUrl string + name string +} + func (cb *connectorBuild) steps() []string { return []string{ fmt.Sprint("mkdir -p ", cb.tmpDir, " ", cb.targetDir), @@ -44,7 +57,15 @@ func (cb *connectorBuild) steps() []string { } } -func (c *commandBuilder) renderScript() string { +func (pb *processorBuild) steps() []string { + return []string{ + fmt.Sprintf( + "wget %s -P %s", pb.procUrl, pb.targetDir, + ), + } +} + +func (c *connectorCommandBuilder) renderScript() string { var final []string for _, build := range c.builds { final = append(final, build.steps()...) @@ -52,13 +73,13 @@ func (c *commandBuilder) renderScript() string { return strings.Join(final, " && ") } -func (c *commandBuilder) empty() bool { +func (c *connectorCommandBuilder) empty() bool { c.Lock() defer c.Unlock() return len(c.builds) == 0 } -func (c *commandBuilder) addConnectorBuild(b connectorBuild) { +func (c *connectorCommandBuilder) addConnectorBuild(b connectorBuild) { c.Lock() defer c.Unlock() @@ -74,9 +95,40 @@ func (c *commandBuilder) addConnectorBuild(b connectorBuild) { c.done[b.goPkg] = true } +func (p *processorCommandBuilder) renderScript() string { + var final []string + for _, build := range p.builds { + final = append(final, build.steps()...) + } + return strings.Join(final, " && ") +} + +func (p *processorCommandBuilder) empty() bool { + p.Lock() + defer p.Unlock() + return len(p.builds) == 0 +} + +func (p *processorCommandBuilder) addProcessorBuild(b processorBuild) { + p.Lock() + defer p.Unlock() + + if p.done == nil { + p.done = make(map[string]bool) + } + + if _, ok := p.done[b.name]; ok { + return + } + + p.builds = append(p.builds, b) + p.done[b.name] = true +} + // ConduitInitContainers returns a slice of kubernetes container definitions func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { - builder := &commandBuilder{} + builder := &connectorCommandBuilder{} + pBuilder := &processorCommandBuilder{} containers := []corev1.Container{ { @@ -97,16 +149,21 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { } for _, c := range cc { - if strings.HasPrefix(c.Plugin, "builtin") { - continue + if !strings.HasPrefix(c.Plugin, "builtin") { + builder.addConnectorBuild(connectorBuild{ + name: fmt.Sprintf("%s-%s", filepath.Base(c.Plugin), c.PluginVersion), + goPkg: c.PluginPkg, + tmpDir: builderTempPath, + targetDir: v1alpha.ConduitConnectorsPath, + ldflags: fmt.Sprintf(`-ldflags "-X 'github.com/%s.version=%s'"`, c.Plugin, c.PluginVersion), + }) + } + for _, p := range c.Processors { + log.Println(p) + if !strings.HasPrefix(p.Plugin, "builtin") && p.ProcessorURL != "" { + pBuilder.addProcessorBuild(processorBuild{name: p.Plugin, procUrl: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) + } } - builder.addConnectorBuild(connectorBuild{ - name: fmt.Sprintf("%s-%s", filepath.Base(c.Plugin), c.PluginVersion), - goPkg: c.PluginPkg, - tmpDir: builderTempPath, - targetDir: v1alpha.ConduitConnectorsPath, - ldflags: fmt.Sprintf(`-ldflags "-X 'github.com/%s.version=%s'"`, c.Plugin, c.PluginVersion), - }) } if !builder.empty() { @@ -127,6 +184,24 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { }) } + if !pBuilder.empty() { + containers = append(containers, corev1.Container{ + Name: fmt.Sprint(v1alpha.ConduitInitContainerName, "-processors"), + Image: v1alpha.ConduitInitImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", pBuilder.renderScript(), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: v1alpha.ConduitStorageVolumeMount, + MountPath: v1alpha.ConduitVolumePath, + }, + }, + }) + } + return containers } From b9a8cb513a0f0a5d64486cfcd9b3b6185a14ae25 Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Wed, 11 Jun 2025 15:34:07 -0400 Subject: [PATCH 4/7] some refactoring and first pass of tests --- internal/controller/conduit_containers.go | 91 +++----- .../controller/conduit_containers_test.go | 211 +++++++++++++++++- 2 files changed, 243 insertions(+), 59 deletions(-) diff --git a/internal/controller/conduit_containers.go b/internal/controller/conduit_containers.go index 387988e..0ec7135 100644 --- a/internal/controller/conduit_containers.go +++ b/internal/controller/conduit_containers.go @@ -18,16 +18,15 @@ const ( builderTempPath = "/tmp/connectors" ) -type connectorCommandBuilder struct { - sync.Mutex - done map[string]bool - builds []connectorBuild // ordered +type Buildable interface { + steps() []string + key() string } -type processorCommandBuilder struct { +type commandBuilder[T Buildable] struct { sync.Mutex done map[string]bool - builds []processorBuild // ordered + builds []T // ordered } type connectorBuild struct { @@ -38,13 +37,7 @@ type connectorBuild struct { ldflags string } -type processorBuild struct { - targetDir string - procUrl string - name string -} - -func (cb *connectorBuild) steps() []string { +func (cb connectorBuild) steps() []string { return []string{ fmt.Sprint("mkdir -p ", cb.tmpDir, " ", cb.targetDir), fmt.Sprint( @@ -57,78 +50,61 @@ func (cb *connectorBuild) steps() []string { } } -func (pb *processorBuild) steps() []string { +func (cb connectorBuild) key() string { + return cb.goPkg +} + +type processorBuild struct { + targetDir string + procUrl string + name string +} + +func (pb processorBuild) steps() []string { return []string{ fmt.Sprintf( - "wget %s -P %s", pb.procUrl, pb.targetDir, + "wget -O %s/%s %s", pb.targetDir, filepath.Base(pb.procUrl), pb.procUrl, ), } } -func (c *connectorCommandBuilder) renderScript() string { +func (pb processorBuild) key() string { + return pb.name +} + +func (c *commandBuilder[T]) renderScript() string { var final []string for _, build := range c.builds { final = append(final, build.steps()...) } + log.Println(strings.Join(final, " && ")) return strings.Join(final, " && ") } -func (c *connectorCommandBuilder) empty() bool { +func (c *commandBuilder[T]) empty() bool { c.Lock() defer c.Unlock() return len(c.builds) == 0 } -func (c *connectorCommandBuilder) addConnectorBuild(b connectorBuild) { +func (c *commandBuilder[T]) addBuild(b T) { c.Lock() defer c.Unlock() - if c.done == nil { c.done = make(map[string]bool) } - - if _, ok := c.done[b.goPkg]; ok { + key := b.key() + if _, ok := c.done[key]; ok { return } - c.builds = append(c.builds, b) - c.done[b.goPkg] = true -} - -func (p *processorCommandBuilder) renderScript() string { - var final []string - for _, build := range p.builds { - final = append(final, build.steps()...) - } - return strings.Join(final, " && ") -} - -func (p *processorCommandBuilder) empty() bool { - p.Lock() - defer p.Unlock() - return len(p.builds) == 0 -} - -func (p *processorCommandBuilder) addProcessorBuild(b processorBuild) { - p.Lock() - defer p.Unlock() - - if p.done == nil { - p.done = make(map[string]bool) - } - - if _, ok := p.done[b.name]; ok { - return - } - - p.builds = append(p.builds, b) - p.done[b.name] = true + c.done[key] = true } // ConduitInitContainers returns a slice of kubernetes container definitions func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { - builder := &connectorCommandBuilder{} - pBuilder := &processorCommandBuilder{} + builder := &commandBuilder[connectorBuild]{} + pBuilder := &commandBuilder[processorBuild]{} containers := []corev1.Container{ { @@ -150,7 +126,7 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { for _, c := range cc { if !strings.HasPrefix(c.Plugin, "builtin") { - builder.addConnectorBuild(connectorBuild{ + builder.addBuild(connectorBuild{ name: fmt.Sprintf("%s-%s", filepath.Base(c.Plugin), c.PluginVersion), goPkg: c.PluginPkg, tmpDir: builderTempPath, @@ -159,9 +135,8 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { }) } for _, p := range c.Processors { - log.Println(p) if !strings.HasPrefix(p.Plugin, "builtin") && p.ProcessorURL != "" { - pBuilder.addProcessorBuild(processorBuild{name: p.Plugin, procUrl: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) + pBuilder.addBuild(processorBuild{name: p.Plugin, procUrl: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) } } } diff --git a/internal/controller/conduit_containers_test.go b/internal/controller/conduit_containers_test.go index 4828488..d0febf3 100644 --- a/internal/controller/conduit_containers_test.go +++ b/internal/controller/conduit_containers_test.go @@ -14,7 +14,7 @@ import ( "github.com/conduitio/conduit-operator/pkg/conduit" ) -func Test_ConduitInitContainers(t *testing.T) { +func Test_ConduitInitConnectorContainers(t *testing.T) { initContainer := corev1.Container{ Name: "conduit-init", Image: "golang:1.23-alpine", @@ -192,6 +192,215 @@ func Test_ConduitInitContainers(t *testing.T) { } } +func Test_ConduitInitProcessorsContainers(t *testing.T) { + initContainer := corev1.Container{ + Name: "conduit-init", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", "-c", + "mkdir -p /conduit.storage/processors /conduit.storage/connectors", + }, + VolumeMounts: []corev1.VolumeMount{{Name: "conduit-storage", MountPath: "/conduit.storage"}}, + } + + tests := []struct { + name string + connectors []*v1alpha.ConduitConnector + imageVer string + want []corev1.Container + }{ + { + name: "only builtin processors", + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "builtin:builtin-processor", + }, + }, + }, + }, + want: []corev1.Container{initContainer}, + }, + { + name: "standalone processor but no URL", + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + }, + }, + }, + }, + want: []corev1.Container{initContainer}, + }, + { + name: "standalone processor with URL", + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "same standalone processor in multiple connectors", + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + { + Plugin: "builtin:builtin-test1", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "multiple standalone processors", + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + { + Plugin: "standalone-processor1", + ProcessorURL: "http://example.com/processor1", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor && wget -O /conduit.storage/processors/processor1 http://example.com/processor1", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "multiple processors - one builtin, one standalone", + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + { + Plugin: "builtin:builtin-processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := ConduitInitContainers(tc.connectors) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("container mismatch (-want +got): %v", diff) + } + }) + } +} + func Test_ConduitRuntimeContainer(t *testing.T) { flags := conduit.NewFlags( conduit.WithPipelineFile(v1alpha.ConduitPipelineFile), From b931f500fac9a074b54c6652f61c14dd7a5652b1 Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Wed, 11 Jun 2025 15:43:50 -0400 Subject: [PATCH 5/7] supporting standalone processors for pipeline level processors --- internal/controller/conduit_containers.go | 9 ++++++-- .../controller/conduit_containers_test.go | 23 ++++++++++++------- internal/controller/conduit_controller.go | 2 +- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/internal/controller/conduit_containers.go b/internal/controller/conduit_containers.go index 0ec7135..8b6c73c 100644 --- a/internal/controller/conduit_containers.go +++ b/internal/controller/conduit_containers.go @@ -102,7 +102,7 @@ func (c *commandBuilder[T]) addBuild(b T) { } // ConduitInitContainers returns a slice of kubernetes container definitions -func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { +func ConduitInitContainers(cc []*v1alpha.ConduitConnector, cp []*v1alpha.ConduitProcessor) []corev1.Container { builder := &commandBuilder[connectorBuild]{} pBuilder := &commandBuilder[processorBuild]{} @@ -140,7 +140,12 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { } } } - + // handles processors that are not part of a connector + for _, p := range cp { + if !strings.HasPrefix(p.Plugin, "builtin") && p.ProcessorURL != "" { + pBuilder.addBuild(processorBuild{name: p.Plugin, procUrl: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) + } + } if !builder.empty() { containers = append(containers, corev1.Container{ Name: fmt.Sprint(v1alpha.ConduitInitContainerName, "-connectors"), diff --git a/internal/controller/conduit_containers_test.go b/internal/controller/conduit_containers_test.go index d0febf3..53bfdbd 100644 --- a/internal/controller/conduit_containers_test.go +++ b/internal/controller/conduit_containers_test.go @@ -184,7 +184,7 @@ func Test_ConduitInitConnectorContainers(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - got := ConduitInitContainers(tc.connectors) + got := ConduitInitContainers(tc.connectors, []*v1alpha.ConduitProcessor{}) if diff := cmp.Diff(tc.want, got); diff != "" { t.Fatalf("container mismatch (-want +got): %v", diff) } @@ -207,11 +207,13 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { tests := []struct { name string connectors []*v1alpha.ConduitConnector + processors []*v1alpha.ConduitProcessor imageVer string want []corev1.Container }{ { - name: "only builtin processors", + name: "only builtin processors", + processors: []*v1alpha.ConduitProcessor{}, connectors: []*v1alpha.ConduitConnector{ { Plugin: "builtin:builtin-test", @@ -226,7 +228,8 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { want: []corev1.Container{initContainer}, }, { - name: "standalone processor but no URL", + name: "standalone processor but no URL", + processors: []*v1alpha.ConduitProcessor{}, connectors: []*v1alpha.ConduitConnector{ { Plugin: "builtin:builtin-test", @@ -241,7 +244,8 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { want: []corev1.Container{initContainer}, }, { - name: "standalone processor with URL", + name: "standalone processor with URL", + processors: []*v1alpha.ConduitProcessor{}, connectors: []*v1alpha.ConduitConnector{ { Plugin: "builtin:builtin-test", @@ -274,7 +278,8 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { }, }, { - name: "same standalone processor in multiple connectors", + name: "same standalone processor in multiple connectors", + processors: []*v1alpha.ConduitProcessor{}, connectors: []*v1alpha.ConduitConnector{ { Plugin: "builtin:builtin-test", @@ -317,7 +322,8 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { }, }, { - name: "multiple standalone processors", + name: "multiple standalone processors", + processors: []*v1alpha.ConduitProcessor{}, connectors: []*v1alpha.ConduitConnector{ { Plugin: "builtin:builtin-test", @@ -354,7 +360,8 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { }, }, { - name: "multiple processors - one builtin, one standalone", + name: "multiple processors - one builtin, one standalone", + processors: []*v1alpha.ConduitProcessor{}, connectors: []*v1alpha.ConduitConnector{ { Plugin: "builtin:builtin-test", @@ -393,7 +400,7 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - got := ConduitInitContainers(tc.connectors) + got := ConduitInitContainers(tc.connectors, tc.processors) if diff := cmp.Diff(tc.want, got); diff != "" { t.Fatalf("container mismatch (-want +got): %v", diff) } diff --git a/internal/controller/conduit_controller.go b/internal/controller/conduit_controller.go index c814192..e66ba18 100644 --- a/internal/controller/conduit_controller.go +++ b/internal/controller/conduit_controller.go @@ -394,7 +394,7 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1. ObjectMeta: metav1.ObjectMeta{}, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyAlways, - InitContainers: ConduitInitContainers(c.Spec.Connectors), + InitContainers: ConduitInitContainers(c.Spec.Connectors, c.Spec.Processors), Containers: []corev1.Container{ container, }, From 9d7d3dc849c4fd651d978d223449ee69b4b661c2 Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Wed, 11 Jun 2025 17:03:42 -0400 Subject: [PATCH 6/7] tests for processor init container --- .../controller/conduit_containers_test.go | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/internal/controller/conduit_containers_test.go b/internal/controller/conduit_containers_test.go index 53bfdbd..d0422b2 100644 --- a/internal/controller/conduit_containers_test.go +++ b/internal/controller/conduit_containers_test.go @@ -396,6 +396,117 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { }, }, }, + { + name: "standalone processor at pipeline level", + processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "same standalone processor at pipeline level and at connector level", + processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "different standalone processor at pipeline level and at connector level", + processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor1", + ProcessorURL: "http://example.com/processor1", + }, + }, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor && wget -O /conduit.storage/processors/processor1 http://example.com/processor1", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, } for _, tc := range tests { From 73327f0778d8c8ac940d872ca9fba5f26316c330 Mon Sep 17 00:00:00 2001 From: Nathan Stehr Date: Thu, 12 Jun 2025 10:29:48 -0400 Subject: [PATCH 7/7] slight refactor --- internal/controller/conduit_containers.go | 19 ++++++++----------- .../controller/conduit_containers_test.go | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/controller/conduit_containers.go b/internal/controller/conduit_containers.go index 8b6c73c..90f6f62 100644 --- a/internal/controller/conduit_containers.go +++ b/internal/controller/conduit_containers.go @@ -2,7 +2,6 @@ package controller import ( "fmt" - "log" "path" "path/filepath" "strings" @@ -56,14 +55,14 @@ func (cb connectorBuild) key() string { type processorBuild struct { targetDir string - procUrl string + procURL string name string } func (pb processorBuild) steps() []string { return []string{ fmt.Sprintf( - "wget -O %s/%s %s", pb.targetDir, filepath.Base(pb.procUrl), pb.procUrl, + "wget -O %s/%s %s", pb.targetDir, filepath.Base(pb.procURL), pb.procURL, ), } } @@ -77,7 +76,6 @@ func (c *commandBuilder[T]) renderScript() string { for _, build := range c.builds { final = append(final, build.steps()...) } - log.Println(strings.Join(final, " && ")) return strings.Join(final, " && ") } @@ -124,6 +122,7 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector, cp []*v1alpha.Conduit }, } + allProcessors := cp for _, c := range cc { if !strings.HasPrefix(c.Plugin, "builtin") { builder.addBuild(connectorBuild{ @@ -134,16 +133,14 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector, cp []*v1alpha.Conduit ldflags: fmt.Sprintf(`-ldflags "-X 'github.com/%s.version=%s'"`, c.Plugin, c.PluginVersion), }) } - for _, p := range c.Processors { - if !strings.HasPrefix(p.Plugin, "builtin") && p.ProcessorURL != "" { - pBuilder.addBuild(processorBuild{name: p.Plugin, procUrl: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) - } + if len(c.Processors) > 0 { + allProcessors = append(allProcessors, c.Processors...) } } - // handles processors that are not part of a connector - for _, p := range cp { + // handles standalone processors that need WASM downloaded + for _, p := range allProcessors { if !strings.HasPrefix(p.Plugin, "builtin") && p.ProcessorURL != "" { - pBuilder.addBuild(processorBuild{name: p.Plugin, procUrl: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) + pBuilder.addBuild(processorBuild{name: p.Plugin, procURL: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) } } if !builder.empty() { diff --git a/internal/controller/conduit_containers_test.go b/internal/controller/conduit_containers_test.go index d0422b2..09738b9 100644 --- a/internal/controller/conduit_containers_test.go +++ b/internal/controller/conduit_containers_test.go @@ -496,7 +496,7 @@ func Test_ConduitInitProcessorsContainers(t *testing.T) { Args: []string{ "sh", "-xe", "-c", - "wget -O /conduit.storage/processors/processor http://example.com/processor && wget -O /conduit.storage/processors/processor1 http://example.com/processor1", + "wget -O /conduit.storage/processors/processor1 http://example.com/processor1 && wget -O /conduit.storage/processors/processor http://example.com/processor", }, VolumeMounts: []corev1.VolumeMount{ {