-
Notifications
You must be signed in to change notification settings - Fork 17
Open
Description
Hi, @bounoable!
How can we create a projection from multiple aggregates?
Let's imagine, we have a User and an Order aggregates (with some events):
type UserAggregate struct {
id string
name string
// ...
}
type OrderAggregate struct {
id string
userID string
// ...
}The goal is to create a User projection:
type UserProjection struct {
id string
name string
orders []Order
// ...
}
type Order struct {
id string
// ...
}Also, we need to store this projection in the database. So here is a snippet:
type Projector struct {
repo Repository
}
func (x Projector) Project(ctx context.Context, bus event.Bus, store event.Store) (<-chan error, error) {
s := schedule.Continuously(bus, store, []string{
user_created_event.Name,
user_renamed_event.Name,
order_created_event.Name,
}, schedule.Debounce(time.Second))
return s.Subscribe(ctx, func(job projection.Job) error {
refs, errs, err := job.Aggregates(ctx, user_aggregate.Name)
if err != nil {
return fmt.Errorf("failed to extract user aggregates: %w", err)
}
return streams.Walk(ctx, func(ref aggregate.Ref) error {
var proj *UserProjection
if proj, err = x.repo.Fetch(ctx, ref.ID); err != nil {
switch {
case errors.Is(err, model.ErrNotFound):
proj = NewUserProjection(ref.ID)
default:
return fmt.Errorf("failed to fetch user projection: %w", err)
}
}
if err = job.Apply(ctx, proj); err != nil {
return fmt.Errorf("failed to apply events to user projection: %w", err)
}
if err = x.repo.Save(ctx, proj); err != nil {
return fmt.Errorf("failed to save user projection: %w", err)
}
return nil
}, refs, errs)
}, projection.Startup())
}There are problems with the current solution:
- Order events won't trigger projection updates;
- There is a gap between fetching projection and saving it;
Could you please suggest the right approach using goes? We don't want to reinvent the wheel. Thank you!
Metadata
Metadata
Assignees
Labels
No labels