Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
name: Release
on:
push:
tags:
- 'v*'
- '!v*-beta'
release:
types: [published]

permissions:
id-token: write # Required for OIDC
Expand Down
2 changes: 1 addition & 1 deletion api/handler_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ func getKafkaInfo(config *asyncapi3.Config) *runtime.KafkaInfo {

func getKafkaInfoWithGroup(config *asyncapi3.Config, group *store.Group) *runtime.KafkaInfo {
s := store.New(config, enginetest.NewEngine(), &eventstest.Handler{}, monitor.NewKafka())
g := s.GetOrCreateGroup(group.Name, 0)
g := s.GetOrCreateGroup(group.Name, &store.Broker{})
*g = *group
return &runtime.KafkaInfo{
Config: config,
Expand Down
2 changes: 1 addition & 1 deletion imap/idle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestIdle(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "+ idling", res)

time.Sleep(2 * time.Second)
time.Sleep(4 * time.Second)

res, err = c.ReadLine()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion providers/asyncapi3/kafka/store/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type logCleaner func(broker *Broker)

type Brokers map[int]*Broker
type Brokers []*Broker

type Broker struct {
Id int
Expand Down
2 changes: 1 addition & 1 deletion providers/asyncapi3/kafka/store/joingroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *Store) joingroup(rw kafka.ResponseWriter, req *kafka.Request) error {
return rw.Write(res)
}

g := s.GetOrCreateGroup(r.GroupId, b.Id)
g := s.GetOrCreateGroup(r.GroupId, b)

ctx.AddGroup(g.Name, r.MemberId)

Expand Down
6 changes: 3 additions & 3 deletions providers/asyncapi3/kafka/store/listgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestListGroup(t *testing.T) {
"with group state",
func(t *testing.T, s *store.Store) {
s.Update(asyncapi3test.NewConfig(asyncapi3test.WithServer("", "kafka", "127.0.0.1")))
group := s.GetOrCreateGroup("foo", 0)
group := s.GetOrCreateGroup("foo", &store.Broker{})
group.State = store.PreparingRebalance
g := group.NewGeneration()
g.Members[""] = &store.Member{}
Expand All @@ -56,8 +56,8 @@ func TestListGroup(t *testing.T) {
"filtering",
func(t *testing.T, s *store.Store) {
s.Update(asyncapi3test.NewConfig(asyncapi3test.WithServer("", "kafka", "127.0.0.1")))
s.GetOrCreateGroup("foo", 0)
group := s.GetOrCreateGroup("bar", 0)
s.GetOrCreateGroup("foo", &store.Broker{})
group := s.GetOrCreateGroup("bar", &store.Broker{})
group.State = store.CompletingRebalance
g := group.NewGeneration()
g.Members[""] = &store.Member{}
Expand Down
27 changes: 14 additions & 13 deletions providers/asyncapi3/kafka/store/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestPartition(t *testing.T) {
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {},
func(record *kafka.Record, schemaId int) bool { return false },
&Topic{},
Expand All @@ -30,7 +30,7 @@ func TestPartition_Write(t *testing.T) {
var logs []int64
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {
logs = append(logs, log.Offset)
},
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestPartition_Write(t *testing.T) {
func TestPartition_Read_Empty(t *testing.T) {
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {},
func(record *kafka.Record, schemaId int) bool { return false },
&Topic{},
Expand All @@ -90,7 +90,7 @@ func TestPartition_Read_Empty(t *testing.T) {
func TestPartition_Read(t *testing.T) {
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {},
func(record *kafka.Record, schemaId int) bool { return false },
&Topic{},
Expand All @@ -117,7 +117,7 @@ func TestPartition_Read(t *testing.T) {
func TestPartition_Read_OutOfOffset_Empty(t *testing.T) {
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {},
func(record *kafka.Record, schemaId int) bool { return false },
&Topic{},
Expand All @@ -130,7 +130,7 @@ func TestPartition_Read_OutOfOffset_Empty(t *testing.T) {
func TestPartition_Read_OutOfOffset(t *testing.T) {
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {},
func(record *kafka.Record, schemaId int) bool { return false },
&Topic{},
Expand All @@ -154,7 +154,7 @@ func TestPartition_Read_OutOfOffset(t *testing.T) {
func TestPartition_Write_Value_Validator(t *testing.T) {
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, _ events.Traits) {
}, func(record *kafka.Record, schemaId int) bool { return false },
&Topic{Config: &asyncapi3.Channel{Bindings: asyncapi3.ChannelBindings{
Expand Down Expand Up @@ -210,14 +210,15 @@ func TestPartition_Write_Value_Validator(t *testing.T) {
require.Equal(t, int64(0), wr.BaseOffset)
require.Equal(t, int64(1), p.Offset())
require.Equal(t, int64(0), p.StartOffset())
record := p.Segments[p.ActiveSegment].record(0)
require.Len(t, record.Headers, 1)
require.Equal(t, "bar-1", record.Headers[0].Key)
require.Equal(t, []byte("foobar"), record.Headers[0].Value)
r := p.Segments[p.ActiveSegment].record(0)
require.NotNil(t, r)
require.Len(t, r.Headers, 1)
require.Equal(t, "bar-1", r.Headers[0].Key)
require.Equal(t, []byte("foobar"), r.Headers[0].Value)
}

func TestPatition_Retention(t *testing.T) {
p := newPartition(0, map[int]*Broker{1: {Id: 1}},
p := newPartition(0, []*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {},
func(record *kafka.Record, schemaId int) bool { return false },
&Topic{},
Expand Down Expand Up @@ -264,7 +265,7 @@ func TestPartition_Write_Producer_ClientId(t *testing.T) {
var logs []*KafkaMessageLog
p := newPartition(
0,
map[int]*Broker{1: {Id: 1}},
[]*Broker{{Id: 1}},
func(log *KafkaMessageLog, traits events.Traits) {
logs = append(logs, log)
},
Expand Down
4 changes: 2 additions & 2 deletions providers/asyncapi3/kafka/store/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestProduce(t *testing.T) {
s.Update(asyncapi3test.NewConfig(
asyncapi3test.WithServer("foo", "kafka", "127.0.0.1"),
asyncapi3test.WithChannel("foo")))
g := s.GetOrCreateGroup("foo", 0)
g := s.GetOrCreateGroup("foo", &store.Broker{})
g.Commit("foo", 0, 0)
sm.SetStore(5, events.NewTraits().WithNamespace("kafka"))

Expand Down Expand Up @@ -564,7 +564,7 @@ func TestProduceTriggersEvent(t *testing.T) {
s.Update(asyncapi3test.NewConfig(
asyncapi3test.WithServer("foo", "kafka", "127.0.0.1"),
asyncapi3test.WithChannel("foo")))
g := s.GetOrCreateGroup("foo", 0)
g := s.GetOrCreateGroup("foo", &store.Broker{})
g.Commit("foo", 0, 0)
sm.SetStore(5, events.NewTraits().WithNamespace("kafka"))

Expand Down
36 changes: 15 additions & 21 deletions providers/asyncapi3/kafka/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"mokapi/runtime/monitor"
"net"
"net/url"
"slices"
"strconv"
"sync"
"time"
Expand All @@ -32,7 +33,7 @@ import (
)

type Store struct {
brokers map[int]*Broker
brokers []*Broker
topics map[string]*Topic
groups map[string]*Group
cluster string
Expand All @@ -54,7 +55,7 @@ type ProducerState struct {
func NewEmpty(eventEmitter common.EventEmitter, eh events.Handler, monitor *monitor.Kafka) *Store {
return &Store{
topics: make(map[string]*Topic),
brokers: make(map[int]*Broker),
brokers: []*Broker{},
groups: make(map[string]*Group),
eventEmitter: eventEmitter,
eh: eh,
Expand Down Expand Up @@ -99,11 +100,6 @@ func (s *Store) Topics() []*Topic {
return topics
}

func (s *Store) Broker(id int) (*Broker, bool) {
b, ok := s.brokers[id]
return b, ok
}

func (s *Store) Brokers() []*Broker {
brokers := make([]*Broker, 0, len(s.brokers))
for _, b := range s.brokers {
Expand All @@ -128,20 +124,15 @@ func (s *Store) Group(name string) (*Group, bool) {
return g, ok
}

func (s *Store) GetOrCreateGroup(name string, brokerId int) *Group {
func (s *Store) GetOrCreateGroup(name string, broker *Broker) *Group {
s.m.Lock()
defer s.m.Unlock()

b, ok := s.Broker(brokerId)
if !ok {
panic(fmt.Sprintf("unknown broker id: %v", brokerId))
}

if g, ok := s.groups[name]; ok {
return g
}

g := s.newGroup(name, b)
g := s.newGroup(name, broker)
s.groups[name] = g
return g
}
Expand All @@ -168,8 +159,11 @@ func (s *Store) Update(c *asyncapi3.Config) {
}
}
for _, b := range s.brokers {
if b == nil {
continue
}
if _, ok := c.Servers.Get(b.Name); !ok {
s.deleteBroker(b.Id)
s.deleteBroker(b)
}
}
}
Expand Down Expand Up @@ -298,18 +292,18 @@ func (s *Store) addBroker(name string, config *asyncapi3.Server) {
id := len(s.brokers)
b := newBroker(id, name, config)

s.brokers[id] = b
s.brokers = append(s.brokers, b)
b.startCleaner(s.cleanLog)
}

func (s *Store) deleteBroker(id int) {
func (s *Store) deleteBroker(broker *Broker) {
s.m.Lock()
defer s.m.Unlock()

if b, ok := s.brokers[id]; ok {
b.stopCleaner()
}
delete(s.brokers, id)
broker.stopCleaner()
s.brokers = slices.DeleteFunc(s.brokers, func(b *Broker) bool {
return b.Name == broker.Name
})
}

func (s *Store) getBroker(name string) *Broker {
Expand Down
6 changes: 3 additions & 3 deletions providers/asyncapi3/kafka/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func TestStore(t *testing.T) {
require.Equal(t, 1, len(s.Brokers()))
require.Equal(t, 0, len(s.Topics()))
require.Equal(t, 0, len(s.Groups()))
b, ok := s.Broker(0)
require.Equal(t, true, ok)
require.Equal(t, "foo", b.Name)
list := s.Brokers()
require.Len(t, list, 1)
require.Equal(t, "foo", list[0].Name)
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion providers/asyncapi3/kafka/store/syncgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *Store) syncgroup(rw kafka.ResponseWriter, req *kafka.Request) error {
if len(r.MemberId) != 0 {
b := s.getBrokerByPort(req.Host)
if b != nil {
g := s.GetOrCreateGroup(r.GroupId, b.Id)
g := s.GetOrCreateGroup(r.GroupId, b)

if g.State != PreparingRebalance {
if g.Generation == nil || g.Generation.Id != int(r.GenerationId) {
Expand Down
Loading