From c880dc87621c23e2d25bcfeffea011a5b0ee8daf Mon Sep 17 00:00:00 2001 From: maesi Date: Mon, 2 Feb 2026 19:31:51 +0100 Subject: [PATCH 1/3] switch to release trigger --- .github/workflows/release.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e889dea64..d5e39cc46 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,9 +1,7 @@ name: Release on: - push: - tags: - - 'v*' - - '!v*-beta' + release: + types: [published] permissions: id-token: write # Required for OIDC From 3f303b87444b4f9fb518bb124d057151f8f977d0 Mon Sep 17 00:00:00 2001 From: maesi Date: Mon, 2 Feb 2026 20:19:05 +0100 Subject: [PATCH 2/3] fix data structure for brokers in Kafka store --- api/handler_kafka_test.go | 2 +- providers/asyncapi3/kafka/store/broker.go | 2 +- providers/asyncapi3/kafka/store/joingroup.go | 2 +- .../asyncapi3/kafka/store/listgroup_test.go | 6 ++-- .../asyncapi3/kafka/store/partition_test.go | 27 +++++++------- .../asyncapi3/kafka/store/produce_test.go | 4 +-- providers/asyncapi3/kafka/store/store.go | 36 ++++++++----------- providers/asyncapi3/kafka/store/store_test.go | 6 ++-- providers/asyncapi3/kafka/store/syncgroup.go | 2 +- 9 files changed, 41 insertions(+), 46 deletions(-) diff --git a/api/handler_kafka_test.go b/api/handler_kafka_test.go index 2a5a071c4..9d158e3bf 100644 --- a/api/handler_kafka_test.go +++ b/api/handler_kafka_test.go @@ -1105,7 +1105,7 @@ func getKafkaInfo(config *asyncapi3.Config) *runtime.KafkaInfo { func getKafkaInfoWithGroup(config *asyncapi3.Config, group *store.Group) *runtime.KafkaInfo { s := store.New(config, enginetest.NewEngine(), &eventstest.Handler{}, monitor.NewKafka()) - g := s.GetOrCreateGroup(group.Name, 0) + g := s.GetOrCreateGroup(group.Name, &store.Broker{}) *g = *group return &runtime.KafkaInfo{ Config: config, diff --git a/providers/asyncapi3/kafka/store/broker.go b/providers/asyncapi3/kafka/store/broker.go index 1732aaa48..b1383be9d 100644 --- a/providers/asyncapi3/kafka/store/broker.go +++ b/providers/asyncapi3/kafka/store/broker.go @@ -8,7 +8,7 @@ import ( type logCleaner func(broker *Broker) -type Brokers map[int]*Broker +type Brokers []*Broker type Broker struct { Id int diff --git a/providers/asyncapi3/kafka/store/joingroup.go b/providers/asyncapi3/kafka/store/joingroup.go index bec24bf07..b7b572748 100644 --- a/providers/asyncapi3/kafka/store/joingroup.go +++ b/providers/asyncapi3/kafka/store/joingroup.go @@ -37,7 +37,7 @@ func (s *Store) joingroup(rw kafka.ResponseWriter, req *kafka.Request) error { return rw.Write(res) } - g := s.GetOrCreateGroup(r.GroupId, b.Id) + g := s.GetOrCreateGroup(r.GroupId, b) ctx.AddGroup(g.Name, r.MemberId) diff --git a/providers/asyncapi3/kafka/store/listgroup_test.go b/providers/asyncapi3/kafka/store/listgroup_test.go index 3961fc14d..5a187421d 100644 --- a/providers/asyncapi3/kafka/store/listgroup_test.go +++ b/providers/asyncapi3/kafka/store/listgroup_test.go @@ -36,7 +36,7 @@ func TestListGroup(t *testing.T) { "with group state", func(t *testing.T, s *store.Store) { s.Update(asyncapi3test.NewConfig(asyncapi3test.WithServer("", "kafka", "127.0.0.1"))) - group := s.GetOrCreateGroup("foo", 0) + group := s.GetOrCreateGroup("foo", &store.Broker{}) group.State = store.PreparingRebalance g := group.NewGeneration() g.Members[""] = &store.Member{} @@ -56,8 +56,8 @@ func TestListGroup(t *testing.T) { "filtering", func(t *testing.T, s *store.Store) { s.Update(asyncapi3test.NewConfig(asyncapi3test.WithServer("", "kafka", "127.0.0.1"))) - s.GetOrCreateGroup("foo", 0) - group := s.GetOrCreateGroup("bar", 0) + s.GetOrCreateGroup("foo", &store.Broker{}) + group := s.GetOrCreateGroup("bar", &store.Broker{}) group.State = store.CompletingRebalance g := group.NewGeneration() g.Members[""] = &store.Member{} diff --git a/providers/asyncapi3/kafka/store/partition_test.go b/providers/asyncapi3/kafka/store/partition_test.go index 773be892f..b43cb61db 100644 --- a/providers/asyncapi3/kafka/store/partition_test.go +++ b/providers/asyncapi3/kafka/store/partition_test.go @@ -15,7 +15,7 @@ import ( func TestPartition(t *testing.T) { p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, @@ -30,7 +30,7 @@ func TestPartition_Write(t *testing.T) { var logs []int64 p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) { logs = append(logs, log.Offset) }, @@ -77,7 +77,7 @@ func TestPartition_Write(t *testing.T) { func TestPartition_Read_Empty(t *testing.T) { p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, @@ -90,7 +90,7 @@ func TestPartition_Read_Empty(t *testing.T) { func TestPartition_Read(t *testing.T) { p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, @@ -117,7 +117,7 @@ func TestPartition_Read(t *testing.T) { func TestPartition_Read_OutOfOffset_Empty(t *testing.T) { p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, @@ -130,7 +130,7 @@ func TestPartition_Read_OutOfOffset_Empty(t *testing.T) { func TestPartition_Read_OutOfOffset(t *testing.T) { p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, @@ -154,7 +154,7 @@ func TestPartition_Read_OutOfOffset(t *testing.T) { func TestPartition_Write_Value_Validator(t *testing.T) { p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, _ events.Traits) { }, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{Config: &asyncapi3.Channel{Bindings: asyncapi3.ChannelBindings{ @@ -210,14 +210,15 @@ func TestPartition_Write_Value_Validator(t *testing.T) { require.Equal(t, int64(0), wr.BaseOffset) require.Equal(t, int64(1), p.Offset()) require.Equal(t, int64(0), p.StartOffset()) - record := p.Segments[p.ActiveSegment].record(0) - require.Len(t, record.Headers, 1) - require.Equal(t, "bar-1", record.Headers[0].Key) - require.Equal(t, []byte("foobar"), record.Headers[0].Value) + r := p.Segments[p.ActiveSegment].record(0) + require.NotNil(t, r) + require.Len(t, r.Headers, 1) + require.Equal(t, "bar-1", r.Headers[0].Key) + require.Equal(t, []byte("foobar"), r.Headers[0].Value) } func TestPatition_Retention(t *testing.T) { - p := newPartition(0, map[int]*Broker{1: {Id: 1}}, + p := newPartition(0, []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, @@ -264,7 +265,7 @@ func TestPartition_Write_Producer_ClientId(t *testing.T) { var logs []*KafkaMessageLog p := newPartition( 0, - map[int]*Broker{1: {Id: 1}}, + []*Broker{{Id: 1}}, func(log *KafkaMessageLog, traits events.Traits) { logs = append(logs, log) }, diff --git a/providers/asyncapi3/kafka/store/produce_test.go b/providers/asyncapi3/kafka/store/produce_test.go index 7c7ff7ead..2641f3094 100644 --- a/providers/asyncapi3/kafka/store/produce_test.go +++ b/providers/asyncapi3/kafka/store/produce_test.go @@ -35,7 +35,7 @@ func TestProduce(t *testing.T) { s.Update(asyncapi3test.NewConfig( asyncapi3test.WithServer("foo", "kafka", "127.0.0.1"), asyncapi3test.WithChannel("foo"))) - g := s.GetOrCreateGroup("foo", 0) + g := s.GetOrCreateGroup("foo", &store.Broker{}) g.Commit("foo", 0, 0) sm.SetStore(5, events.NewTraits().WithNamespace("kafka")) @@ -564,7 +564,7 @@ func TestProduceTriggersEvent(t *testing.T) { s.Update(asyncapi3test.NewConfig( asyncapi3test.WithServer("foo", "kafka", "127.0.0.1"), asyncapi3test.WithChannel("foo"))) - g := s.GetOrCreateGroup("foo", 0) + g := s.GetOrCreateGroup("foo", &store.Broker{}) g.Commit("foo", 0, 0) sm.SetStore(5, events.NewTraits().WithNamespace("kafka")) diff --git a/providers/asyncapi3/kafka/store/store.go b/providers/asyncapi3/kafka/store/store.go index 31363cb06..445bd4e66 100644 --- a/providers/asyncapi3/kafka/store/store.go +++ b/providers/asyncapi3/kafka/store/store.go @@ -23,6 +23,7 @@ import ( "mokapi/runtime/monitor" "net" "net/url" + "slices" "strconv" "sync" "time" @@ -32,7 +33,7 @@ import ( ) type Store struct { - brokers map[int]*Broker + brokers []*Broker topics map[string]*Topic groups map[string]*Group cluster string @@ -54,7 +55,7 @@ type ProducerState struct { func NewEmpty(eventEmitter common.EventEmitter, eh events.Handler, monitor *monitor.Kafka) *Store { return &Store{ topics: make(map[string]*Topic), - brokers: make(map[int]*Broker), + brokers: []*Broker{}, groups: make(map[string]*Group), eventEmitter: eventEmitter, eh: eh, @@ -99,11 +100,6 @@ func (s *Store) Topics() []*Topic { return topics } -func (s *Store) Broker(id int) (*Broker, bool) { - b, ok := s.brokers[id] - return b, ok -} - func (s *Store) Brokers() []*Broker { brokers := make([]*Broker, 0, len(s.brokers)) for _, b := range s.brokers { @@ -128,20 +124,15 @@ func (s *Store) Group(name string) (*Group, bool) { return g, ok } -func (s *Store) GetOrCreateGroup(name string, brokerId int) *Group { +func (s *Store) GetOrCreateGroup(name string, broker *Broker) *Group { s.m.Lock() defer s.m.Unlock() - b, ok := s.Broker(brokerId) - if !ok { - panic(fmt.Sprintf("unknown broker id: %v", brokerId)) - } - if g, ok := s.groups[name]; ok { return g } - g := s.newGroup(name, b) + g := s.newGroup(name, broker) s.groups[name] = g return g } @@ -168,8 +159,11 @@ func (s *Store) Update(c *asyncapi3.Config) { } } for _, b := range s.brokers { + if b == nil { + continue + } if _, ok := c.Servers.Get(b.Name); !ok { - s.deleteBroker(b.Id) + s.deleteBroker(b) } } } @@ -298,18 +292,18 @@ func (s *Store) addBroker(name string, config *asyncapi3.Server) { id := len(s.brokers) b := newBroker(id, name, config) - s.brokers[id] = b + s.brokers = append(s.brokers, b) b.startCleaner(s.cleanLog) } -func (s *Store) deleteBroker(id int) { +func (s *Store) deleteBroker(broker *Broker) { s.m.Lock() defer s.m.Unlock() - if b, ok := s.brokers[id]; ok { - b.stopCleaner() - } - delete(s.brokers, id) + broker.stopCleaner() + s.brokers = slices.DeleteFunc(s.brokers, func(b *Broker) bool { + return b.Name == broker.Name + }) } func (s *Store) getBroker(name string) *Broker { diff --git a/providers/asyncapi3/kafka/store/store_test.go b/providers/asyncapi3/kafka/store/store_test.go index c885b3048..23dea9e47 100644 --- a/providers/asyncapi3/kafka/store/store_test.go +++ b/providers/asyncapi3/kafka/store/store_test.go @@ -40,9 +40,9 @@ func TestStore(t *testing.T) { require.Equal(t, 1, len(s.Brokers())) require.Equal(t, 0, len(s.Topics())) require.Equal(t, 0, len(s.Groups())) - b, ok := s.Broker(0) - require.Equal(t, true, ok) - require.Equal(t, "foo", b.Name) + list := s.Brokers() + require.Len(t, list, 1) + require.Equal(t, "foo", list[0].Name) }, }, { diff --git a/providers/asyncapi3/kafka/store/syncgroup.go b/providers/asyncapi3/kafka/store/syncgroup.go index 8a601230b..3d19d4ac3 100644 --- a/providers/asyncapi3/kafka/store/syncgroup.go +++ b/providers/asyncapi3/kafka/store/syncgroup.go @@ -33,7 +33,7 @@ func (s *Store) syncgroup(rw kafka.ResponseWriter, req *kafka.Request) error { if len(r.MemberId) != 0 { b := s.getBrokerByPort(req.Host) if b != nil { - g := s.GetOrCreateGroup(r.GroupId, b.Id) + g := s.GetOrCreateGroup(r.GroupId, b) if g.State != PreparingRebalance { if g.Generation == nil || g.Generation.Id != int(r.GenerationId) { From 5c92a247f452512b0d5e2bf4c2bb57c2d7aa6ba9 Mon Sep 17 00:00:00 2001 From: maesi Date: Mon, 2 Feb 2026 20:29:54 +0100 Subject: [PATCH 3/3] improve test timer --- imap/idle_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/imap/idle_test.go b/imap/idle_test.go index 5e0376806..e7547bb36 100644 --- a/imap/idle_test.go +++ b/imap/idle_test.go @@ -178,7 +178,7 @@ func TestIdle(t *testing.T) { require.NoError(t, err) require.Equal(t, "+ idling", res) - time.Sleep(2 * time.Second) + time.Sleep(4 * time.Second) res, err = c.ReadLine() require.NoError(t, err)