Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/assets/Operator-Design.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions pkg/cache/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Object struct {

// Domain returns the hash based testing domain for preview deployments, or an
// empty string for any other main release and non-testing environment.
//
// 1d0fd508.lite.testing.splits.org
func (o Object) Domain(env string) string {
// Note that we filter the domain name creation by preview deployments,
// because at the time of writing we do not have any convenient way to tell
Expand All @@ -58,6 +60,24 @@ func (o Object) Domain(env string) string {
)
}

// Drift tries to detect a single valid state drift, in order to allow allow the
// operator chain to execute. Our current policy requires the following
// conditions to be true for a valid state drift.
//
// 1. the desired deployment must not be suspended
//
// 2. the current and desired state must not be equal
//
// 3. the desired state must not be empty
//
// 4. the container image for the desired state must be pushed
func (o Object) Drift() bool {
return !bool(o.Release.Deploy.Suspend) &&
o.Artifact.Drift() &&
!o.Artifact.Empty() &&
o.Artifact.Valid()
}

func (o Object) Name() string {
if o.kin == Infrastructure {
return o.Release.Github.String()
Expand Down
102 changes: 102 additions & 0 deletions pkg/cache/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,113 @@ import (
"fmt"
"testing"

"github.com/0xSplits/kayron/pkg/release/artifact"
"github.com/0xSplits/kayron/pkg/release/artifact/condition"
"github.com/0xSplits/kayron/pkg/release/artifact/reference"
"github.com/0xSplits/kayron/pkg/release/artifact/scheduler"
"github.com/0xSplits/kayron/pkg/release/schema/release"
"github.com/0xSplits/kayron/pkg/release/schema/release/deploy"
"github.com/0xSplits/kayron/pkg/release/schema/release/deploy/suspend"
"github.com/0xSplits/kayron/pkg/release/schema/release/docker"
"github.com/0xSplits/kayron/pkg/release/schema/release/github"
"github.com/google/go-cmp/cmp"
)

func Test_Cache_Object_Drift(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the Drift method as property of the release artifact / cache object to have it reusable. Those tests here just moved package. We had them already in a different place.

testCases := []struct {
rel Object
dft bool
}{
// Case 000, empty release, no drift
{
rel: Object{},
dft: false,
},
// Case 001, no drift
{
rel: Object{
Artifact: artifact.Struct{
Condition: condition.Struct{
Success: true,
},
Scheduler: scheduler.Struct{
Current: "foo",
},
Reference: reference.Struct{
Desired: "foo",
},
},
},
dft: false,
},
// Case 002, drift, failed condition
{
rel: Object{
Artifact: artifact.Struct{
Condition: condition.Struct{
Success: false,
},
Scheduler: scheduler.Struct{
Current: "foo",
},
Reference: reference.Struct{
Desired: "bar",
},
},
},
dft: false,
},
// Case 003, drift
{
rel: Object{
Artifact: artifact.Struct{
Condition: condition.Struct{
Success: true,
},
Scheduler: scheduler.Struct{
Current: "foo",
},
Reference: reference.Struct{
Desired: "bar",
},
},
},
dft: true,
},
// Case 004, drift, suspended
{
rel: Object{
Artifact: artifact.Struct{
Condition: condition.Struct{
Success: true,
},
Scheduler: scheduler.Struct{
Current: "foo",
},
Reference: reference.Struct{
Desired: "bar",
},
},
Release: release.Struct{
Deploy: deploy.Struct{
Suspend: suspend.Bool(true),
},
},
},
dft: false,
},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) {
dft := tc.rel.Drift()
if dif := cmp.Diff(tc.dft, dft); dif != "" {
t.Fatalf("-expected +actual:\n%s", dif)
}
})
}
}

func Test_Cache_Object_Parameter(t *testing.T) {
testCases := []struct {
obj Object
Expand Down
15 changes: 9 additions & 6 deletions pkg/operator/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ func (o *Operator) Chain() [][]handler.Ensure {
// our service releases.
{o.registry},

// Run the next steps in parallel in order to find the current and
// desired state of the release artifacts that we are tasked to
// managed.
// Run the next steps in parallel and split the execution graph based on the
// different underlying responsibilities.
//
// 1. Once the current and desired states of the runnable service
// releases are known to have drifted apart, we can fetch the current
Expand All @@ -57,10 +56,14 @@ func (o *Operator) Chain() [][]handler.Ensure {
// that this operator function is only active in case of valid state
// drift.
//
// 2. Manage any relevant deployment status for visibility purposes. E.g.
// emit log messages and create or update pull request comments etc.
// 2. Emit log messages about the internal state of the system. This
// operator function runs on each and every reconciliation loop.
//
// 3. Manage any relevant deployment status for visibility purposes.
// E.g. create and update pull request comments about any preview
// deployment status.
//
{o.infrastructure, o.status},
{o.infrastructure, o.logging, o.status},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we wire up the operator chain.


// With the CloudFormation templates uploaded to S3, we can construct the
// payload to update the CloudFormation stack that we are responsible for.
Expand Down
87 changes: 81 additions & 6 deletions pkg/operator/container/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package container
import (
"context"
"slices"
"sort"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ecs"
Expand Down Expand Up @@ -60,18 +61,73 @@ func (c *Container) task(det []detail) ([]task, error) {
}

for _, x := range out.Services {
// For our reconciliation of the current state here to make sense we have
// to ensure that the services being managed are actually controlled by
// ECS. If we do not check this we may end up getting the wrong kind of
// deployment information that we rely on to be provided in a certain way
// below.

if x.DeploymentController.Type != types.DeploymentControllerTypeEcs {
c.log.Log(
"level", "warning",
"message", "skipping reconciliation for ECS service",
"reason", "service does not use ECS controller",
"cluster", *x.ClusterArn,
"service", *x.ServiceArn,
)

{
continue
}
}

// There might be inactive or draining services with our desired service
// labels in case we updated CloudFormation stacks multiple times during
// with preview deployments during testing. We only want to consider the
// current state of those stacks that are still active, because the
// inactive versions have most likely been deleted already.

if aws.ToString(x.Status) != "ACTIVE" {
// There might be inactive or draining services with our desired service
// labels in case we updated CloudFormation stacks multiple times during
// with preview deployments during testing. We only want to consider the
// current state of those stacks that are still active, because the
// inactive versions have most likely been deleted already.
c.log.Log(
"level", "debug",
"message", "skipping reconciliation for ECS service",
"reason", "ECS service is inactive",
"cluster", *x.ClusterArn,
"service", *x.ServiceArn,
)

{
continue
}
}

// Try to find the task definition that is successfully deployed. Here we
// prevent picking those new task definitions prematurely that are still
// transitioning during deployments.

var arn string
{
arn = tasArn(x.Deployments)
}

if arn == "" {
c.log.Log(
"level", "debug",
"message", "skipping reconciliation for ECS service",
"reason", "ECS service has no completed task definition",
"cluster", *x.ClusterArn,
"service", *x.ServiceArn,
)

{
continue
}
}

// Try to identify the service and task definition based on their resource
// tags. If a service is not labelled with our desired 'service' tag,
// then we cannot work with it properly moving forward.

var pre string
var ser string
{
Expand All @@ -97,7 +153,7 @@ func (c *Container) task(det []detail) ([]task, error) {
// executes for its own index.

tas[i] = task{
arn: *x.TaskDefinition,
arn: arn,
pre: pre,
ser: ser,
}
Expand Down Expand Up @@ -148,3 +204,22 @@ func serTag(tag []types.Tag, key string) string {

return ""
}

func tasArn(dep []types.Deployment) string {
// Make sure we can select the newest deployment first, so sort by time in
// descending direction.

sort.Slice(dep, func(i, j int) bool {
return aws.ToTime(dep[i].CreatedAt).After(aws.ToTime(dep[j].CreatedAt))
})

// Now pick the task definition ARN of the first completed deployment.

for _, y := range dep {
if y.RolloutState == types.DeploymentRolloutStateCompleted {
return aws.ToString(y.TaskDefinition)
}
Comment on lines +219 to +221
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we fix the current state lookup in the sense that we now only acknowledge the new current state if it comes from a successful deployment.

The result is that we keep the Updating state in the status updates until the new deployment finishes. And so the issue comments in our pull requests will more accurately reflect the status of the preview deployments.

}

return ""
}
6 changes: 6 additions & 0 deletions pkg/operator/logging/active.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package logging

// Active defines this worker handler to always be executed.
func (l *Logging) Active() bool {
return true
}
Comment on lines +1 to +6
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a dedicated operator function for logging some information. That code separation makes everything cleaner and easier to work with.

52 changes: 52 additions & 0 deletions pkg/operator/logging/ensure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package logging

import (
"github.com/0xSplits/kayron/pkg/cache"
)

func (l *Logging) Ensure() error {
var can bool
{
can = l.pol.Cancel()
}

if can {
l.log.Log(
"level", "info",
"message", "deployment in progress",
)

return nil
}

var dft []cache.Object
{
dft = l.pol.Drift()
}

// If we do not have any drifted release artifacts, then this means that all
// service releases were found to be up to date this time around.

if len(dft) == 0 {
l.log.Log(
"level", "info",
"message", "no state drift",
)
}

// If we do have drifted release artifacts, then we want to create an info log
// message for each affected service release.

for _, x := range dft {
l.log.Log(
"level", "info",
"message", "detected state drift",
"release", x.Name(),
"preview", x.Release.Labels.Hash.Upper(),
"domain", x.Domain(l.env.Environment),
"version", x.Artifact.Reference.Desired,
)
}

return nil
}
42 changes: 42 additions & 0 deletions pkg/operator/logging/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Package logging emits log messages for each and every reconciliation loop
// about the most important aspects of the current state of the system.
package logging

import (
"fmt"

"github.com/0xSplits/kayron/pkg/envvar"
"github.com/0xSplits/kayron/pkg/policy"
"github.com/xh3b4sd/logger"
"github.com/xh3b4sd/tracer"
)

type Config struct {
Env envvar.Env
Log logger.Interface
Pol *policy.Policy
}

type Logging struct {
env envvar.Env
log logger.Interface
pol *policy.Policy
}

func New(c Config) *Logging {
if c.Env.Environment == "" {
tracer.Panic(tracer.Mask(fmt.Errorf("%T.Env must not be empty", c)))
}
if c.Log == nil {
tracer.Panic(tracer.Mask(fmt.Errorf("%T.Log must not be empty", c)))
}
if c.Pol == nil {
tracer.Panic(tracer.Mask(fmt.Errorf("%T.Pol must not be empty", c)))
}

return &Logging{
env: c.Env,
log: c.Log,
pol: c.Pol,
}
}
Loading