diff --git a/internal/controller/conduit_containers.go b/internal/controller/conduit_containers.go index 7053319..90f6f62 100644 --- a/internal/controller/conduit_containers.go +++ b/internal/controller/conduit_containers.go @@ -17,10 +17,15 @@ const ( builderTempPath = "/tmp/connectors" ) -type commandBuilder struct { +type Buildable interface { + steps() []string + key() string +} + +type commandBuilder[T Buildable] struct { sync.Mutex done map[string]bool - builds []connectorBuild // ordered + builds []T // ordered } type connectorBuild struct { @@ -31,7 +36,7 @@ type connectorBuild struct { ldflags string } -func (cb *connectorBuild) steps() []string { +func (cb connectorBuild) steps() []string { return []string{ fmt.Sprint("mkdir -p ", cb.tmpDir, " ", cb.targetDir), fmt.Sprint( @@ -44,7 +49,29 @@ func (cb *connectorBuild) steps() []string { } } -func (c *commandBuilder) renderScript() string { +func (cb connectorBuild) key() string { + return cb.goPkg +} + +type processorBuild struct { + targetDir string + procURL string + name string +} + +func (pb processorBuild) steps() []string { + return []string{ + fmt.Sprintf( + "wget -O %s/%s %s", pb.targetDir, filepath.Base(pb.procURL), pb.procURL, + ), + } +} + +func (pb processorBuild) key() string { + return pb.name +} + +func (c *commandBuilder[T]) renderScript() string { var final []string for _, build := range c.builds { final = append(final, build.steps()...) @@ -52,31 +79,30 @@ func (c *commandBuilder) renderScript() string { return strings.Join(final, " && ") } -func (c *commandBuilder) empty() bool { +func (c *commandBuilder[T]) empty() bool { c.Lock() defer c.Unlock() return len(c.builds) == 0 } -func (c *commandBuilder) addConnectorBuild(b connectorBuild) { +func (c *commandBuilder[T]) addBuild(b T) { c.Lock() defer c.Unlock() - if c.done == nil { c.done = make(map[string]bool) } - - if _, ok := c.done[b.goPkg]; ok { + key := b.key() + if _, ok := c.done[key]; ok { return } - c.builds = append(c.builds, b) - c.done[b.goPkg] = true + c.done[key] = true } // ConduitInitContainers returns a slice of kubernetes container definitions -func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { - builder := &commandBuilder{} +func ConduitInitContainers(cc []*v1alpha.ConduitConnector, cp []*v1alpha.ConduitProcessor) []corev1.Container { + builder := &commandBuilder[connectorBuild]{} + pBuilder := &commandBuilder[processorBuild]{} containers := []corev1.Container{ { @@ -96,19 +122,27 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { }, } + allProcessors := cp for _, c := range cc { - if strings.HasPrefix(c.Plugin, "builtin") { - continue + if !strings.HasPrefix(c.Plugin, "builtin") { + builder.addBuild(connectorBuild{ + name: fmt.Sprintf("%s-%s", filepath.Base(c.Plugin), c.PluginVersion), + goPkg: c.PluginPkg, + tmpDir: builderTempPath, + targetDir: v1alpha.ConduitConnectorsPath, + ldflags: fmt.Sprintf(`-ldflags "-X 'github.com/%s.version=%s'"`, c.Plugin, c.PluginVersion), + }) + } + if len(c.Processors) > 0 { + allProcessors = append(allProcessors, c.Processors...) + } + } + // handles standalone processors that need WASM downloaded + for _, p := range allProcessors { + if !strings.HasPrefix(p.Plugin, "builtin") && p.ProcessorURL != "" { + pBuilder.addBuild(processorBuild{name: p.Plugin, procURL: p.ProcessorURL, targetDir: v1alpha.ConduitProcessorsPath}) } - builder.addConnectorBuild(connectorBuild{ - name: fmt.Sprintf("%s-%s", filepath.Base(c.Plugin), c.PluginVersion), - goPkg: c.PluginPkg, - tmpDir: builderTempPath, - targetDir: v1alpha.ConduitConnectorsPath, - ldflags: fmt.Sprintf(`-ldflags "-X 'github.com/%s.version=%s'"`, c.Plugin, c.PluginVersion), - }) } - if !builder.empty() { containers = append(containers, corev1.Container{ Name: fmt.Sprint(v1alpha.ConduitInitContainerName, "-connectors"), @@ -127,6 +161,24 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container { }) } + if !pBuilder.empty() { + containers = append(containers, corev1.Container{ + Name: fmt.Sprint(v1alpha.ConduitInitContainerName, "-processors"), + Image: v1alpha.ConduitInitImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", pBuilder.renderScript(), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: v1alpha.ConduitStorageVolumeMount, + MountPath: v1alpha.ConduitVolumePath, + }, + }, + }) + } + return containers } diff --git a/internal/controller/conduit_containers_test.go b/internal/controller/conduit_containers_test.go index 4828488..09738b9 100644 --- a/internal/controller/conduit_containers_test.go +++ b/internal/controller/conduit_containers_test.go @@ -14,7 +14,7 @@ import ( "github.com/conduitio/conduit-operator/pkg/conduit" ) -func Test_ConduitInitContainers(t *testing.T) { +func Test_ConduitInitConnectorContainers(t *testing.T) { initContainer := corev1.Container{ Name: "conduit-init", Image: "golang:1.23-alpine", @@ -184,7 +184,334 @@ func Test_ConduitInitContainers(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - got := ConduitInitContainers(tc.connectors) + got := ConduitInitContainers(tc.connectors, []*v1alpha.ConduitProcessor{}) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("container mismatch (-want +got): %v", diff) + } + }) + } +} + +func Test_ConduitInitProcessorsContainers(t *testing.T) { + initContainer := corev1.Container{ + Name: "conduit-init", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", "-c", + "mkdir -p /conduit.storage/processors /conduit.storage/connectors", + }, + VolumeMounts: []corev1.VolumeMount{{Name: "conduit-storage", MountPath: "/conduit.storage"}}, + } + + tests := []struct { + name string + connectors []*v1alpha.ConduitConnector + processors []*v1alpha.ConduitProcessor + imageVer string + want []corev1.Container + }{ + { + name: "only builtin processors", + processors: []*v1alpha.ConduitProcessor{}, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "builtin:builtin-processor", + }, + }, + }, + }, + want: []corev1.Container{initContainer}, + }, + { + name: "standalone processor but no URL", + processors: []*v1alpha.ConduitProcessor{}, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + }, + }, + }, + }, + want: []corev1.Container{initContainer}, + }, + { + name: "standalone processor with URL", + processors: []*v1alpha.ConduitProcessor{}, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "same standalone processor in multiple connectors", + processors: []*v1alpha.ConduitProcessor{}, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + { + Plugin: "builtin:builtin-test1", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "multiple standalone processors", + processors: []*v1alpha.ConduitProcessor{}, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + { + Plugin: "standalone-processor1", + ProcessorURL: "http://example.com/processor1", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor && wget -O /conduit.storage/processors/processor1 http://example.com/processor1", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "multiple processors - one builtin, one standalone", + processors: []*v1alpha.ConduitProcessor{}, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + { + Plugin: "builtin:builtin-processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "standalone processor at pipeline level", + processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "same standalone processor at pipeline level and at connector level", + processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + { + name: "different standalone processor at pipeline level and at connector level", + processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor1", + ProcessorURL: "http://example.com/processor1", + }, + }, + connectors: []*v1alpha.ConduitConnector{ + { + Plugin: "builtin:builtin-test", + PluginVersion: "latest", + Processors: []*v1alpha.ConduitProcessor{ + { + Plugin: "standalone-processor", + ProcessorURL: "http://example.com/processor", + }, + }, + }, + }, + want: []corev1.Container{ + initContainer, { + Name: "conduit-init-processors", + Image: "golang:1.23-alpine", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", + "-c", + "wget -O /conduit.storage/processors/processor1 http://example.com/processor1 && wget -O /conduit.storage/processors/processor http://example.com/processor", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + }, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := ConduitInitContainers(tc.connectors, tc.processors) if diff := cmp.Diff(tc.want, got); diff != "" { t.Fatalf("container mismatch (-want +got): %v", diff) } diff --git a/internal/controller/conduit_controller.go b/internal/controller/conduit_controller.go index c814192..e66ba18 100644 --- a/internal/controller/conduit_controller.go +++ b/internal/controller/conduit_controller.go @@ -394,7 +394,7 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1. ObjectMeta: metav1.ObjectMeta{}, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyAlways, - InitContainers: ConduitInitContainers(c.Spec.Connectors), + InitContainers: ConduitInitContainers(c.Spec.Connectors, c.Spec.Processors), Containers: []corev1.Container{ container, },