From dba8fdacb7da4c17cb89dc1c7849ebf165238258 Mon Sep 17 00:00:00 2001 From: maesi Date: Sat, 24 Jan 2026 23:08:56 +0100 Subject: [PATCH 01/85] remove npm token --- .github/workflows/release.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d134255d9..8f8c6904e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -52,8 +52,6 @@ jobs: TAP_GITHUB_TOKEN: ${{ secrets.TAP_GITHUB_TOKEN }} - name: Publish npm run: task publish-npm-package VERSION=${GITHUB_REF##*/v} - env: - NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} release-windows: runs-on: windows-latest From 716833624e81dda57fe35309fb02479204c0b6d4 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 07:58:35 +0100 Subject: [PATCH 02/85] fix typo in names --- .../{mardown-blockquote.ts => markdown-blockquote.ts} | 0 webui/src/composables/{mardown-links.ts => markdown-links.ts} | 0 webui/src/composables/markdown.ts | 4 ++-- 3 files changed, 2 insertions(+), 2 deletions(-) rename webui/src/composables/{mardown-blockquote.ts => markdown-blockquote.ts} (100%) rename webui/src/composables/{mardown-links.ts => markdown-links.ts} (100%) diff --git a/webui/src/composables/mardown-blockquote.ts b/webui/src/composables/markdown-blockquote.ts similarity index 100% rename from webui/src/composables/mardown-blockquote.ts rename to webui/src/composables/markdown-blockquote.ts diff --git a/webui/src/composables/mardown-links.ts b/webui/src/composables/markdown-links.ts similarity index 100% rename from webui/src/composables/mardown-links.ts rename to webui/src/composables/markdown-links.ts diff --git a/webui/src/composables/markdown.ts b/webui/src/composables/markdown.ts index 247149602..71e5b49e8 100644 --- a/webui/src/composables/markdown.ts +++ b/webui/src/composables/markdown.ts @@ -2,11 +2,11 @@ import MarkdownItHighlightjs from 'markdown-it-highlightjs'; import MarkdownIt from 'markdown-it'; import { MarkdownItTabs } from '@/composables/markdown-tabs'; import { MarkdownItBox } from '@/composables/markdown-box'; -import { MarkdownItLinks } from '@/composables/mardown-links'; +import { MarkdownItLinks } from '@/composables/markdown-links'; import { MarkdownItCard } from '@/composables/markdown-card'; import { MarkdownItCarousel } from './markdown-carousel'; import yaml from 'js-yaml' -import { MarkdownItBlockquote } from './mardown-blockquote'; +import { MarkdownItBlockquote } from './markdown-blockquote'; const images = import.meta.glob('/src/assets/docs/**/*.png', {as: 'url', eager: true}) const metadataRegex = /^---([\s\S]*?)---/; From a2f6fbb9b6df9ecd573cba5700ce74b4c2c81d25 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 11:27:05 +0100 Subject: [PATCH 03/85] change Kafka group displaying in dashboard --- webui/e2e/dashboard-demo/kafka.spec.ts | 72 ++++-- .../dashboard/kafka/KafkaGroups.vue | 207 +++--------------- .../dashboard/kafka/KafkaService.vue | 10 +- .../components/dashboard/kafka/KafkaTopic.vue | 74 ++++++- .../dashboard/kafka/KafkaTopicsCard.vue | 1 - webui/src/router/index.ts | 12 + 6 files changed, 157 insertions(+), 219 deletions(-) diff --git a/webui/e2e/dashboard-demo/kafka.spec.ts b/webui/e2e/dashboard-demo/kafka.spec.ts index 9dba22b2c..e12958117 100644 --- a/webui/e2e/dashboard-demo/kafka.spec.ts +++ b/webui/e2e/dashboard-demo/kafka.spec.ts @@ -66,30 +66,54 @@ test('Visit Kafka Order Service', async ({ page }) => { await expect(tooltip.getByLabel('Topics')).toHaveText('order-topic'); await rows.nth(0).getByRole('cell').nth(0).click(); - const dialog = page.getByRole('dialog', { name: 'Group Details' }); - await expect(dialog).toBeVisible(); - await expect(dialog.getByLabel('Name')).toHaveText('order-status-group-100'); - await expect(dialog.getByLabel('State')).toHaveText('Stable'); - await expect(dialog.getByLabel('Protocol')).toHaveText('RoundRobinAssigner'); - await expect(dialog.getByLabel('Coordinator')).toHaveText('localhost:9092'); - await expect(dialog.getByLabel('Leader')).toHaveText(/^producer/); - - await dialog.getByRole('tab', { name: 'Topics' }).click(); - const topics = dialog.getByRole('table', { name: 'Topics' }); - await expect(await getCellByColumnName(topics, 'Topic')).toHaveText('order-topic'); - - await dialog.getByRole('tab', { name: 'Members' }).click(); - const membersPanel = dialog.getByRole('tabpanel', { name: 'Members' }); - await expect(membersPanel).toBeVisible(); - await expect(membersPanel.getByRole('tab', { name: /^producer/ })).toHaveAttribute('aria-selected', 'true'); - await expect(membersPanel.getByLabel('Address')).not.toBeEmpty(); - await expect(membersPanel.getByLabel('Client Software')).toHaveText('-'); - await expect(membersPanel.getByLabel('Heartbeat')).not.toBeEmpty(); - const memberPartitions = membersPanel.getByRole('table', { name: 'Member Partitions' }) - await expect(await getCellByColumnName(memberPartitions, 'Topic')).toHaveText('order-topic'); - await expect(await getCellByColumnName(memberPartitions, 'Partitions')).toHaveText('0'); - - await dialog.getByRole('button', { name: 'Close' }).click(); + await expect(page.getByLabel('Group Name')).toHaveText('order-status-group-100'); + await expect(page.getByLabel('State')).toHaveText('Stable'); + await expect(page.getByLabel('Protocol')).toHaveText('RoundRobinAssigner'); + await expect(page.getByLabel('Coordinator')).toHaveText('localhost:9092'); + await expect(page.getByLabel('Leader', { exact: true })).toHaveText(/^producer/); + + // await dialog.getByRole('tab', { name: 'Topics' }).click(); + // const topics = dialog.getByRole('table', { name: 'Topics' }); + // await expect(await getCellByColumnName(topics, 'Topic')).toHaveText('order-topic'); + + await test.step('Verify Members', async () => { + + const region = page.getByRole('region', { name: 'Members' }); + await expect(region).toBeVisible(); + + const members = region.getByRole('table', { name: 'Members' }); + const rows = members.locator('tbody tr'); + await expect(rows).toHaveCount(1); + await expect((await getCellByColumnName(members, 'Group leader', rows.nth(0))).getByLabel('Group leader')).toBeVisible(); + await expect(await getCellByColumnName(members, 'Name', rows.nth(0))).toHaveText(/^producer/); + await expect(await getCellByColumnName(members, 'Address', rows.nth(0))).not.toBeEmpty(); + await expect(await getCellByColumnName(members, 'Client Software', rows.nth(0))).toHaveText('-'); + await expect(await getCellByColumnName(members, 'Heartbeat', rows.nth(0))).not.toBeEmpty(); + + await test.step('Verify Member', async () => { + + await members.locator('tbody tr').click(); + + await expect(page.getByLabel('Member Name')).toHaveText(/^producer/); + await expect(page.getByLabel('Address')).not.toBeEmpty(); + await expect(page.getByLabel('Client Software')).toHaveText('-'); + await expect(page.getByLabel('Heartbeat')).not.toBeEmpty(); + + const region = page.getByRole('region', { name: 'Partitions' }); + await expect(region).toBeVisible(); + + const table = region.getByRole('table', { name: 'Partitions' }); + const rows = table.locator('tbody tr'); + await expect(rows).toHaveCount(1); + await expect((await getCellByColumnName(table, 'Topic', rows.nth(0)))).toHaveText('order-topic'); + await expect(await getCellByColumnName(table, 'Partition', rows.nth(0))).toHaveText('0'); + + await page.goBack(); + + }); + }); + + await page.goBack(); }); diff --git a/webui/src/components/dashboard/kafka/KafkaGroups.vue b/webui/src/components/dashboard/kafka/KafkaGroups.vue index ffc62edd7..214a63353 100644 --- a/webui/src/components/dashboard/kafka/KafkaGroups.vue +++ b/webui/src/components/dashboard/kafka/KafkaGroups.vue @@ -1,17 +1,21 @@ @@ -106,11 +102,11 @@ function getClientSoftware(member: KafkaMember) { - + - + {{ group.name }} - + {{ group.state }} @@ -132,155 +128,4 @@ function getClientSoftware(member: KafkaMember) { - - - - \ No newline at end of file + \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaService.vue b/webui/src/components/dashboard/kafka/KafkaService.vue index c18c58b2a..5ae276f7e 100644 --- a/webui/src/components/dashboard/kafka/KafkaService.vue +++ b/webui/src/components/dashboard/kafka/KafkaService.vue @@ -8,6 +8,8 @@ import KafkaMessagesCard from './KafkaMessagesCard.vue' import KafkaTopic from './KafkaTopic.vue' import Servers from './Servers.vue' import ConfigCard from '../ConfigCard.vue' +import KafkaGroup from './KafkaGroup.vue' +import KafkaGroupMember from './KafkaGroupMember.vue' import Message from './Message.vue' import { getRouteName, useDashboard } from '@/composables/dashboard'; @@ -45,8 +47,14 @@ if (serviceName){ -
+
+
+ +
+
+ +
\ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaTopic.vue b/webui/src/components/dashboard/kafka/KafkaTopic.vue index ced49f3c4..025c84b5f 100644 --- a/webui/src/components/dashboard/kafka/KafkaTopic.vue +++ b/webui/src/components/dashboard/kafka/KafkaTopic.vue @@ -1,5 +1,5 @@ \ No newline at end of file diff --git a/webui/src/composables/kafka.ts b/webui/src/composables/kafka.ts index 1bbed33bd..026a531bc 100644 --- a/webui/src/composables/kafka.ts +++ b/webui/src/composables/kafka.ts @@ -1,6 +1,6 @@ export function useKafka() { - function clientSoftware(member: KafkaMember) { + function clientSoftware(member: KafkaClient | KafkaMember) { let client = `${member.clientSoftwareName} ${member.clientSoftwareVersion}` if (client === ' ') { client = '-' @@ -8,7 +8,12 @@ export function useKafka() { return client } + function formatAddress(address: string): string { + return address.replace('[::1]', 'localhost') + } + return { - clientSoftware + clientSoftware, + formatAddress } } \ No newline at end of file diff --git a/webui/src/composables/metrics.ts b/webui/src/composables/metrics.ts index 1b2ca6ca5..0e477053b 100644 --- a/webui/src/composables/metrics.ts +++ b/webui/src/composables/metrics.ts @@ -49,5 +49,23 @@ export function useMetrics() { return true } - return {sum, max} + function value(metrics: Metric[], name: string, ...labels: Label[]): number | undefined { + if (!metrics){ + return 0 + } + + for (let metric of metrics) { + if (!metric.name.startsWith(name)) { + continue + } + + if (labels.length == 0 || matchLabels(metric, labels)){ + const n = Number(metric.value) + return n + } + } + return undefined + } + + return {sum, max, value} } \ No newline at end of file diff --git a/webui/src/router/index.ts b/webui/src/router/index.ts index 0e69a9d1b..90c7e383e 100644 --- a/webui/src/router/index.ts +++ b/webui/src/router/index.ts @@ -128,6 +128,12 @@ function createDashboardRoute(mode: 'live' | 'demo'): RouteRecordRaw { name: getRouteName('kafkaMessage'), component: dashboardView, meta: {service: 'kafka'} + }, + { + path: 'service/:service/clients/:clientId', + name: getRouteName('kafkaClient'), + component: dashboardView, + meta: {service: 'kafka'} } ] }, diff --git a/webui/src/types/kafka.d.ts b/webui/src/types/kafka.d.ts index 988104167..149e3b59e 100644 --- a/webui/src/types/kafka.d.ts +++ b/webui/src/types/kafka.d.ts @@ -2,6 +2,7 @@ declare interface KafkaService extends Service { topics: KafkaTopic[]; groups: KafkaGroup[]; servers: KafkaServer[]; + clients: KafkaClient[]; } declare interface KafkaServer { @@ -49,6 +50,7 @@ declare interface KafkaBroker { declare interface KafkaGroup { name: string; + generation: number members: KafkaMember[]; coordinator: string; leader: string; @@ -59,6 +61,7 @@ declare interface KafkaGroup { declare interface KafkaMember { name: string; + clientId: string addr: string; clientSoftwareName: string; clientSoftwareVersion: string; @@ -91,3 +94,14 @@ declare interface KafkaValue { value?: string binary?: string } + +declare interface KafkaClient { + clientId: string + address: string + clientSoftwareName: string; + clientSoftwareVersion: string; + groups: { + memberId: string + group: string + }[] +} \ No newline at end of file From 2019b913a0cd91c82ce4ce3b1ecb351b28424a88 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 19:33:54 +0100 Subject: [PATCH 06/85] change Leader handling simplified: - Each partition and group now always reports the current server as leader/coordinator. --- acceptance/petstore_test.go | 4 +-- api/handler_kafka.go | 35 +++++-------------- api/handler_kafka_test.go | 19 +++++----- kafka/client_context.go | 5 +-- kafka/kafkatest/kafkatest.go | 2 +- kafka/server.go | 2 +- .../asyncapi3/kafka/store/find_coordinator.go | 25 ++++++------- providers/asyncapi3/kafka/store/group.go | 10 +++--- providers/asyncapi3/kafka/store/joingroup.go | 3 -- .../asyncapi3/kafka/store/log_cleaner.go | 2 +- providers/asyncapi3/kafka/store/metadata.go | 17 +++------ providers/asyncapi3/kafka/store/partition.go | 23 ++---------- .../asyncapi3/kafka/store/partition_test.go | 2 -- .../asyncapi3/kafka/store/produce_test.go | 6 ++-- providers/asyncapi3/kafka/store/store.go | 5 --- providers/asyncapi3/kafka/store/syncgroup.go | 3 -- webui/e2e/Dashboard/kafka/cluster.ts | 6 ---- webui/e2e/components/kafka.ts | 10 ++---- webui/e2e/dashboard-demo/kafka.spec.ts | 4 --- .../dashboard/kafka/KafkaClient.vue | 6 +++- .../components/dashboard/kafka/KafkaGroup.vue | 18 ++++------ .../dashboard/kafka/KafkaGroupMember.vue | 2 +- .../dashboard/kafka/KafkaGroups.vue | 6 ++-- .../dashboard/kafka/KafkaPartition.vue | 2 -- webui/src/types/kafka.d.ts | 3 +- 25 files changed, 69 insertions(+), 151 deletions(-) diff --git a/acceptance/petstore_test.go b/acceptance/petstore_test.go index 241bc14ed..eb52c4e3e 100644 --- a/acceptance/petstore_test.go +++ b/acceptance/petstore_test.go @@ -120,8 +120,8 @@ func (suite *PetStoreSuite) TestApi() { }, "name": "petstore.order-event", "partitions": []interface{}{ - map[string]interface{}{"id": float64(0), "leader": map[string]interface{}{"addr": "127.0.0.1:19092", "name": "broker"}, "offset": float64(1), "segments": float64(1), "startOffset": float64(0)}, - map[string]interface{}{"id": float64(1), "leader": map[string]interface{}{"addr": "127.0.0.1:19092", "name": "broker"}, "offset": float64(0), "segments": float64(0), "startOffset": float64(0)}, + map[string]interface{}{"id": float64(0), "offset": float64(1), "segments": float64(1), "startOffset": float64(0)}, + map[string]interface{}{"id": float64(1), "offset": float64(0), "segments": float64(0), "startOffset": float64(0)}, }, }}, } diff --git a/api/handler_kafka.go b/api/handler_kafka.go index 2bee33e7d..6ed1c6803 100644 --- a/api/handler_kafka.go +++ b/api/handler_kafka.go @@ -60,7 +60,6 @@ type group struct { Name string `json:"name"` Generation int `json:"generation"` Members []member `json:"members"` - Coordinator string `json:"coordinator"` Leader string `json:"leader"` State string `json:"state"` AssignmentStrategy string `json:"protocol"` @@ -70,6 +69,7 @@ type group struct { type client struct { ClientId string `json:"clientId"` Address string `json:"address"` + BrokerAddress string `json:"brokerAddress"` ClientSoftwareName string `json:"clientSoftwareName"` ClientSoftwareVersion string `json:"clientSoftwareVersion"` Groups []clientGroupMember `json:"groups"` @@ -99,16 +99,10 @@ type topic struct { } type partition struct { - Id int `json:"id"` - StartOffset int64 `json:"startOffset"` - Offset int64 `json:"offset"` - Leader broker `json:"leader"` - Segments int `json:"segments"` -} - -type broker struct { - Name string `json:"name"` - Addr string `json:"addr"` + Id int `json:"id"` + StartOffset int64 `json:"startOffset"` + Offset int64 `json:"offset"` + Segments int `json:"segments"` } type messageConfig struct { @@ -464,6 +458,7 @@ func getKafka(info *runtime.KafkaInfo) kafkaInfo { c := client{ ClientId: ctx.ClientId, Address: ctx.Addr, + BrokerAddress: ctx.ServerAddress, ClientSoftwareName: ctx.ClientSoftwareName, ClientSoftwareVersion: ctx.ClientSoftwareVersion, } @@ -595,10 +590,9 @@ func getPartitions(t *store.Topic) []partition { func newGroup(g *store.Group) group { grp := group{ - Name: g.Name, - Generation: g.Generation.Id, - State: g.State.String(), - Coordinator: g.Coordinator.Addr(), + Name: g.Name, + Generation: g.Generation.Id, + State: g.State.String(), } if g.Generation != nil { grp.Leader = g.Generation.LeaderId @@ -634,21 +628,10 @@ func newPartition(p *store.Partition) partition { Id: p.Index, StartOffset: p.StartOffset(), Offset: p.Offset(), - Leader: newBroker(p.Leader), Segments: len(p.Segments), } } -func newBroker(b *store.Broker) broker { - if b == nil { - return broker{} - } - return broker{ - Name: b.Name, - Addr: b.Addr(), - } -} - func getKafkaClusters(app *runtime.App) []cluster { var clusters []cluster for _, k := range app.Kafka.List() { diff --git a/api/handler_kafka_test.go b/api/handler_kafka_test.go index d0a81705b..8d6fbc1f3 100644 --- a/api/handler_kafka_test.go +++ b/api/handler_kafka_test.go @@ -174,7 +174,7 @@ func TestHandler_Kafka(t *testing.T) { })) }, requestUrl: "http://foo.api/api/services/kafka/foo", - responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"name":"foo","payload":{"schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`, + responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"segments":0}],"messages":{"foo":{"name":"foo","payload":{"schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`, }, { name: "get specific with topic and multi schema format", @@ -197,7 +197,7 @@ func TestHandler_Kafka(t *testing.T) { })) }, requestUrl: "http://foo.api/api/services/kafka/foo", - responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"name":"foo","payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`, + responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"segments":0}],"messages":{"foo":{"name":"foo","payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`, }, { name: "get specific with group", @@ -222,7 +222,7 @@ func TestHandler_Kafka(t *testing.T) { return app }, requestUrl: "http://foo.api/api/services/kafka/foo", - responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":""}],"groups":[{"name":"foo","generation":3,"members":null,"coordinator":"foo.bar:9092","leader":"","state":"PreparingRebalance","protocol":"range","topics":null}]}`, + responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":""}],"groups":[{"name":"foo","generation":3,"members":null,"leader":"","state":"PreparingRebalance","protocol":"range","topics":null}]}`, }, { name: "get specific with group containing members", @@ -278,7 +278,7 @@ func TestHandler_Kafka(t *testing.T) { return app }, requestUrl: "http://foo.api/api/services/kafka/foo", - responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":""}],"groups":[{"name":"foo","generation":3,"members":[{"name":"m1","clientId":"client1","addr":"192.168.0.100","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:05+07:00","partitions":{"topic":[1,2,5]}},{"name":"m2","clientId":"client2","addr":"192.168.0.200","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:10+07:00","partitions":{"topic":[3,4,6]}}],"coordinator":"foo.bar:9092","leader":"m1","state":"PreparingRebalance","protocol":"range","topics":null}]}`, + responseBody: `{"name":"foo","description":"bar","version":"1.0","servers":[{"name":"foo","host":"foo.bar","protocol":"kafka","description":""}],"groups":[{"name":"foo","generation":3,"members":[{"name":"m1","clientId":"client1","addr":"192.168.0.100","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:05+07:00","partitions":{"topic":[1,2,5]}},{"name":"m2","clientId":"client2","addr":"192.168.0.200","clientSoftwareName":"mokapi","clientSoftwareVersion":"1.0","heartbeat":"2024-04-22T15:04:10+07:00","partitions":{"topic":[3,4,6]}}],"leader":"m1","state":"PreparingRebalance","protocol":"range","topics":null}]}`, }, { name: "get specific with topic and openapi schema", @@ -297,7 +297,7 @@ func TestHandler_Kafka(t *testing.T) { return app }, requestUrl: "http://foo.api/api/services/kafka/foo", - responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"","addr":""},"segments":0}],"messages":{"foo":{"name":"foo","payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`, + responseBody: `{"name":"foo","description":"bar","version":"1.0","topics":[{"name":"foo","description":"bar","partitions":[{"id":0,"startOffset":0,"offset":0,"segments":0}],"messages":{"foo":{"name":"foo","payload":{"format":"foo","schema":{"type":"string"}},"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]}`, }, } @@ -379,7 +379,7 @@ func TestHandler_KafkaAPI(t *testing.T) { h, try.HasStatusCode(200), try.HasHeader("Content-Type", "application/json"), - try.HasBody(`[{"name":"topic-1","description":"foobar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"broker-1","addr":"localhost:9092"},"segments":0}],"messages":{"foo":{"name":"foo","payload":null,"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]`), + try.HasBody(`[{"name":"topic-1","description":"foobar","partitions":[{"id":0,"startOffset":0,"offset":0,"segments":0}],"messages":{"foo":{"name":"foo","payload":null,"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}]`), ) }, }, @@ -409,7 +409,7 @@ func TestHandler_KafkaAPI(t *testing.T) { h, try.HasStatusCode(200), try.HasHeader("Content-Type", "application/json"), - try.HasBody(`{"name":"topic-1","description":"foobar","partitions":[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"broker-1","addr":"localhost:9092"},"segments":0}],"messages":{"foo":{"name":"foo","payload":null,"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}`), + try.HasBody(`{"name":"topic-1","description":"foobar","partitions":[{"id":0,"startOffset":0,"offset":0,"segments":0}],"messages":{"foo":{"name":"foo","payload":null,"contentType":"application/json"}},"bindings":{"partitions":1,"valueSchemaValidation":true}}`), ) }, }, @@ -611,7 +611,7 @@ func TestHandler_KafkaAPI(t *testing.T) { "", h, try.HasStatusCode(http.StatusOK), - try.HasBody(`[{"id":0,"startOffset":0,"offset":0,"leader":{"name":"broker-1","addr":"localhost:9092"},"segments":0}]`), + try.HasBody(`[{"id":0,"startOffset":0,"offset":0,"segments":0}]`), ) }, }, @@ -640,7 +640,7 @@ func TestHandler_KafkaAPI(t *testing.T) { "", h, try.HasStatusCode(http.StatusOK), - try.HasBody(`{"id":0,"startOffset":0,"offset":0,"leader":{"name":"broker-1","addr":"localhost:9092"},"segments":0}`), + try.HasBody(`{"id":0,"startOffset":0,"offset":0,"segments":0}`), ) }, }, @@ -1068,7 +1068,6 @@ func getKafkaInfo(config *asyncapi3.Config) *runtime.KafkaInfo { func getKafkaInfoWithGroup(config *asyncapi3.Config, group *store.Group) *runtime.KafkaInfo { s := store.New(config, enginetest.NewEngine(), &eventstest.Handler{}, monitor.NewKafka()) g := s.GetOrCreateGroup(group.Name, 0) - group.Coordinator, _ = s.Broker(0) *g = *group return &runtime.KafkaInfo{ Config: config, diff --git a/kafka/client_context.go b/kafka/client_context.go index 1a6fd97a7..98322dc16 100644 --- a/kafka/client_context.go +++ b/kafka/client_context.go @@ -28,6 +28,7 @@ type ClientContext struct { Member map[string]string Close func() AllowAutoTopicCreation bool + ServerAddress string } func (c *ClientContext) AddGroup(groupName, memberId string) { @@ -54,6 +55,6 @@ func ClientFromContext(ctx context.Context) *ClientContext { return ctx.Value(clientKey).(*ClientContext) } -func NewClientContext(ctx context.Context, addr string) context.Context { - return context.WithValue(ctx, clientKey, &ClientContext{Addr: addr, AllowAutoTopicCreation: true, Heartbeat: time.Now()}) +func NewClientContext(ctx context.Context, addr, serverAddress string) context.Context { + return context.WithValue(ctx, clientKey, &ClientContext{Addr: addr, ServerAddress: serverAddress, AllowAutoTopicCreation: true, Heartbeat: time.Now()}) } diff --git a/kafka/kafkatest/kafkatest.go b/kafka/kafkatest/kafkatest.go index de1aa222e..e1171818c 100644 --- a/kafka/kafkatest/kafkatest.go +++ b/kafka/kafkatest/kafkatest.go @@ -29,7 +29,7 @@ func NewRequest(clientId string, version int16, msg kafka.Message) *kafka.Reques ClientId: clientId, }, Message: msg, - Context: kafka.NewClientContext(context.Background(), "127.0.0.1:42424"), + Context: kafka.NewClientContext(context.Background(), "127.0.0.1:42424", "127.0.0.1:9092"), } ctx := kafka.ClientFromContext(r.Context) ctx.ClientId = clientId diff --git a/kafka/server.go b/kafka/server.go index ab8b49a7a..da575b55a 100644 --- a/kafka/server.go +++ b/kafka/server.go @@ -173,7 +173,7 @@ func (s *Server) trackConn(conn net.Conn) context.Context { if s.activeConn == nil { s.activeConn = make(map[net.Conn]context.Context) } - ctx := NewClientContext(context.Background(), conn.RemoteAddr().String()) + ctx := NewClientContext(context.Background(), conn.RemoteAddr().String(), conn.LocalAddr().String()) s.activeConn[conn] = ctx return ctx diff --git a/providers/asyncapi3/kafka/store/find_coordinator.go b/providers/asyncapi3/kafka/store/find_coordinator.go index 8d76ef05d..b661977c9 100644 --- a/providers/asyncapi3/kafka/store/find_coordinator.go +++ b/providers/asyncapi3/kafka/store/find_coordinator.go @@ -26,23 +26,18 @@ func (s *Store) findCoordinator(rw kafka.ResponseWriter, req *kafka.Request) err if b == nil { return writeError(kafka.UnknownServerError, fmt.Sprintf("broker %v not found", req.Host)) } - g := s.GetOrCreateGroup(r.Key, b.Id) - if g.Coordinator == nil { - return writeError(kafka.CoordinatorNotAvailable, fmt.Sprintf("no coordinator for group %v available", r.Key)) - } else { - host := g.Coordinator.Host - if len(host) == 0 { - var err error - host, _, err = net.SplitHostPort(req.Host) - if err != nil { - return writeError(kafka.UnknownServerError, fmt.Sprintf("broker %v not found: %v", req.Host, err)) - } + host := b.Host + if len(host) == 0 { + var err error + host, _, err = net.SplitHostPort(req.Host) + if err != nil { + return writeError(kafka.UnknownServerError, fmt.Sprintf("broker %v not found: %v", req.Host, err)) } - - res.NodeId = int32(g.Coordinator.Id) - res.Host = host - res.Port = int32(g.Coordinator.Port) } + + res.NodeId = int32(b.Id) + res.Host = host + res.Port = int32(b.Port) default: res.ErrorCode = kafka.UnknownServerError res.ErrorMessage = fmt.Sprintf("unsupported request key_type=%v", r.KeyType) diff --git a/providers/asyncapi3/kafka/store/group.go b/providers/asyncapi3/kafka/store/group.go index fcc1fefc6..9f3a5d0fd 100644 --- a/providers/asyncapi3/kafka/store/group.go +++ b/providers/asyncapi3/kafka/store/group.go @@ -25,10 +25,9 @@ var states = [...]string{ } type Group struct { - Name string - Coordinator *Broker - State GroupState - Generation *Generation + Name string + State GroupState + Generation *Generation // todo add timestamp and metadata to commit Commits map[string]map[int]int64 @@ -38,8 +37,7 @@ type Group struct { func (s *Store) newGroup(name string, coordinator *Broker) *Group { g := &Group{ - Name: name, - Coordinator: coordinator, + Name: name, } g.balancer = newGroupBalancer(g, coordinator.kafkaConfig, &groupMonitor{cluster: s.cluster, monitor: s.monitor}) go g.balancer.run() diff --git a/providers/asyncapi3/kafka/store/joingroup.go b/providers/asyncapi3/kafka/store/joingroup.go index 6b33c6a97..603265c91 100644 --- a/providers/asyncapi3/kafka/store/joingroup.go +++ b/providers/asyncapi3/kafka/store/joingroup.go @@ -18,9 +18,6 @@ func (s *Store) joingroup(rw kafka.ResponseWriter, req *kafka.Request) error { } g := s.GetOrCreateGroup(r.GroupId, b.Id) - if g.Coordinator.Id != b.Id { - return rw.Write(&joinGroup.Response{ErrorCode: kafka.NotCoordinator}) - } ctx.AddGroup(g.Name, r.MemberId) diff --git a/providers/asyncapi3/kafka/store/log_cleaner.go b/providers/asyncapi3/kafka/store/log_cleaner.go index 08f2990b7..d55757501 100644 --- a/providers/asyncapi3/kafka/store/log_cleaner.go +++ b/providers/asyncapi3/kafka/store/log_cleaner.go @@ -38,7 +38,7 @@ func (s *Store) cleanLog(b *Broker) { } for _, p := range topic.Partitions { - if p.Leader.Id != b.Id { + if p.leader.Id != b.Id { continue } diff --git a/providers/asyncapi3/kafka/store/metadata.go b/providers/asyncapi3/kafka/store/metadata.go index 7c0928c6f..9fefdf549 100644 --- a/providers/asyncapi3/kafka/store/metadata.go +++ b/providers/asyncapi3/kafka/store/metadata.go @@ -71,21 +71,14 @@ func (s *Store) metadata(rw kafka.ResponseWriter, req *kafka.Request) error { Name: t.Name, } - for i, p := range t.Partitions { - replicas := p.Replicas - nodes := make([]int32, 0, len(replicas)) - for _, n := range replicas { - nodes = append(nodes, int32(n)) - } - brokerId := -1 - if p.Leader != nil { - brokerId = p.Leader.Id - } + brokerId := -1 + if b != nil { + brokerId = b.Id + } + for i := range t.Partitions { resTopic.Partitions = append(resTopic.Partitions, metaData.ResponsePartition{ PartitionIndex: int32(i), LeaderId: int32(brokerId), - ReplicaNodes: nodes, - IsrNodes: nodes, }) } diff --git a/providers/asyncapi3/kafka/store/partition.go b/providers/asyncapi3/kafka/store/partition.go index 868024e62..ecb020d36 100644 --- a/providers/asyncapi3/kafka/store/partition.go +++ b/providers/asyncapi3/kafka/store/partition.go @@ -22,8 +22,8 @@ type Partition struct { Tail int64 Topic *Topic - Leader *Broker - Replicas []int + // only for log cleaner + leader *Broker validator *validator logger LogRecord @@ -93,14 +93,8 @@ func newPartition(index int, brokers Brokers, logger LogRecord, trigger Trigger, producers: make(map[int64]*PartitionProducerState), } if len(brokerList) > 0 { - p.Leader = brokerList[0] + p.leader = brokerList[0] } - if len(brokerList) > 1 { - p.Replicas = brokerIds[1:] - } else { - p.Replicas = make([]int, 0) - } - return p } @@ -318,17 +312,6 @@ func (p *Partition) removeSegment(s *Segment) { delete(p.Segments, s.Head) } -func (p *Partition) removeReplica(id int) { - i := 0 - for _, replica := range p.Replicas { - if replica != id { - p.Replicas[i] = replica - i++ - } - } - p.Replicas = p.Replicas[:i] -} - func (p *Partition) addSegment() *Segment { p.m.RLock() defer p.m.RUnlock() diff --git a/providers/asyncapi3/kafka/store/partition_test.go b/providers/asyncapi3/kafka/store/partition_test.go index f15c955f9..98384d32c 100644 --- a/providers/asyncapi3/kafka/store/partition_test.go +++ b/providers/asyncapi3/kafka/store/partition_test.go @@ -24,8 +24,6 @@ func TestPartition(t *testing.T) { require.Equal(t, 0, p.Index) require.Equal(t, int64(0), p.StartOffset()) require.Equal(t, int64(0), p.Offset()) - require.Equal(t, 1, p.Leader.Id) - require.Equal(t, []int{}, p.Replicas) } func TestPartition_Write(t *testing.T) { diff --git a/providers/asyncapi3/kafka/store/produce_test.go b/providers/asyncapi3/kafka/store/produce_test.go index 5ffb9dbe1..1d1b72159 100644 --- a/providers/asyncapi3/kafka/store/produce_test.go +++ b/providers/asyncapi3/kafka/store/produce_test.go @@ -369,7 +369,7 @@ func TestProduce(t *testing.T) { ), )) hook := test.NewGlobal() - ctx := kafka.NewClientContext(context.Background(), "127.0.0.1:42424") + ctx := kafka.NewClientContext(context.Background(), "127.0.0.1:42424", "127.0.0.1:9092") sm.SetStore(5, events.NewTraits().WithNamespace("kafka")) rr := kafkatest.NewRecorder() @@ -435,7 +435,7 @@ func TestProduce(t *testing.T) { ), )) hook := test.NewGlobal() - ctx := kafka.NewClientContext(context.Background(), "127.0.0.1:42424") + ctx := kafka.NewClientContext(context.Background(), "127.0.0.1:42424", "127.0.0.1:9092") rr := kafkatest.NewRecorder() s.ServeMessage(rr, kafkatest.NewRequest("MOKAPITEST1", 3, &initProducerId.Request{}).WithContext(ctx)) @@ -493,7 +493,7 @@ func TestProduce(t *testing.T) { ), )) hook := test.NewGlobal() - ctx := kafka.NewClientContext(context.Background(), "127.0.0.1:42424") + ctx := kafka.NewClientContext(context.Background(), "127.0.0.1:42424", "127.0.0.1:9092") rr := kafkatest.NewRecorder() s.ServeMessage(rr, kafkatest.NewRequest("MOKAPITEST1", 3, &initProducerId.Request{}).WithContext(ctx)) diff --git a/providers/asyncapi3/kafka/store/store.go b/providers/asyncapi3/kafka/store/store.go index ec4f04c2b..5ff5228d9 100644 --- a/providers/asyncapi3/kafka/store/store.go +++ b/providers/asyncapi3/kafka/store/store.go @@ -302,11 +302,6 @@ func (s *Store) deleteBroker(id int) { s.m.Lock() defer s.m.Unlock() - for _, t := range s.topics { - for _, p := range t.Partitions { - p.removeReplica(id) - } - } if b, ok := s.brokers[id]; ok { b.stopCleaner() } diff --git a/providers/asyncapi3/kafka/store/syncgroup.go b/providers/asyncapi3/kafka/store/syncgroup.go index faa8df852..301f5a3ac 100644 --- a/providers/asyncapi3/kafka/store/syncgroup.go +++ b/providers/asyncapi3/kafka/store/syncgroup.go @@ -22,9 +22,6 @@ func (s *Store) syncgroup(rw kafka.ResponseWriter, req *kafka.Request) error { } g := s.GetOrCreateGroup(r.GroupId, b.Id) - if g.Coordinator.Id != b.Id { - return rw.Write(&syncGroup.Response{ErrorCode: kafka.NotCoordinator}) - } if g.State == PreparingRebalance { return rw.Write(&syncGroup.Response{ErrorCode: kafka.RebalanceInProgress}) diff --git a/webui/e2e/Dashboard/kafka/cluster.ts b/webui/e2e/Dashboard/kafka/cluster.ts index 7a87b045a..3b95a17c3 100644 --- a/webui/e2e/Dashboard/kafka/cluster.ts +++ b/webui/e2e/Dashboard/kafka/cluster.ts @@ -23,21 +23,18 @@ export const cluster = { partitions: [ { id: '0', - leader: 'foo (localhost:9002)', startOffset: '0', offset: '4', segments: '1' }, { id: '1', - leader: 'foo (localhost:9002)', startOffset: '0', offset: '3', segments: '1' }, { id: '2', - leader: 'foo (localhost:9002)', startOffset: '0', offset: '3', segments: '1' @@ -65,7 +62,6 @@ export const cluster = { partitions: [ { id: '0', - leader: 'foo (localhost:9002)', startOffset: '0', offset: '0', segments: '1' @@ -101,7 +97,6 @@ export const cluster = { name: 'foo', state: 'Stable', protocol: 'Range', - coordinator: 'localhost:9092', leader: 'julie', members: [ { @@ -124,7 +119,6 @@ export const cluster = { name: 'bar', state: 'Stable', protocol: 'Range', - coordinator: 'localhost:9092', leader: 'george', members: [ { diff --git a/webui/e2e/components/kafka.ts b/webui/e2e/components/kafka.ts index 968adbb5b..d8f1cc0c1 100644 --- a/webui/e2e/components/kafka.ts +++ b/webui/e2e/components/kafka.ts @@ -28,8 +28,6 @@ export interface Group { name: string state: string protocol: string - coordinator: string - leader: string members: { name: string address: string @@ -43,7 +41,7 @@ export function useKafkaGroups(table: Locator, topic?: string) { return { async testGroup(row: number, group: Group, lags?: string) { await test.step(`Check Kafka group in row ${row}`, async () => { - let columns = ['Name', 'State', 'Protocol', 'Coordinator', 'Leader', 'Members'] + let columns = ['Name', 'State', 'Protocol', 'Leader', 'Members'] if (lags) { columns.push('Lag') } @@ -53,8 +51,6 @@ export function useKafkaGroups(table: Locator, topic?: string) { await expect(g.getCellByName('Name')).toHaveText(group.name) await expect(g.getCellByName('State')).toHaveText(group.state) await expect(g.getCellByName('Protocol')).toHaveText(group.protocol) - await expect(g.getCellByName('Coordinator')).toHaveText(group.coordinator) - await expect(g.getCellByName('Leader')).toHaveText(group.leader) if (lags) { await expect(g.getCellByName('Lag')).toHaveText(lags) } @@ -79,20 +75,18 @@ export function useKafkaGroups(table: Locator, topic?: string) { export interface Partition { id: string - leader: string startOffset: string offset: string segments: string } export function useKafkaPartitions(table: Locator) { - const partitions = useTable(table, ['ID', 'Leader', 'Start Offset', 'Offset', 'Segments']) + const partitions = useTable(table, ['ID', 'Start Offset', 'Offset', 'Segments']) return { async testPartition(row: number, partition: Partition) { await test.step(`Check Kafka partition in row ${row}`, async () => { const p = partitions.getRow(row + 1) await expect(p.getCellByName('ID')).toHaveText(partition.id) - await expect(p.getCellByName('Leader')).toHaveText(partition.leader) await expect(p.getCellByName('Start Offset')).toHaveText(partition.startOffset) await expect(p.getCellByName('Offset')).toHaveText(partition.offset) await expect(p.getCellByName('Segments')).toHaveText(partition.segments) diff --git a/webui/e2e/dashboard-demo/kafka.spec.ts b/webui/e2e/dashboard-demo/kafka.spec.ts index 709241299..86834ba32 100644 --- a/webui/e2e/dashboard-demo/kafka.spec.ts +++ b/webui/e2e/dashboard-demo/kafka.spec.ts @@ -52,7 +52,6 @@ test('Visit Kafka Order Service', async ({ page }) => { await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('order-status-group-100'); await expect(await getCellByColumnName(table, 'State', rows.nth(0))).toHaveText('Stable'); await expect(await getCellByColumnName(table, 'Protocol', rows.nth(0))).toHaveText('RoundRobinAssigner'); - await expect(await getCellByColumnName(table, 'Coordinator', rows.nth(0))).toHaveText('localhost:9092'); await expect(await getCellByColumnName(table, 'Leader', rows.nth(0))).toHaveText(/^consumer-1/); const members = await getCellByColumnName(table, 'Members', rows.nth(0)) await expect(members).toHaveText(/^consumer-1/); @@ -69,7 +68,6 @@ test('Visit Kafka Order Service', async ({ page }) => { await expect(page.getByLabel('Group Name')).toHaveText('order-status-group-100'); await expect(page.getByLabel('State')).toHaveText('Stable'); await expect(page.getByLabel('Protocol')).toHaveText('RoundRobinAssigner'); - await expect(page.getByLabel('Coordinator')).toHaveText('localhost:9092'); await expect(page.getByLabel('Generation', { exact: true })).toHaveText('0'); await test.step('Verify Members', async () => { @@ -172,7 +170,6 @@ test('Visit Kafka Order Service', async ({ page }) => { const rows = table.locator('tbody tr'); await expect(rows).toHaveCount(1); await expect(await getCellByColumnName(table, 'ID')).toHaveText('0'); - await expect(await getCellByColumnName(table, 'Leader')).toHaveText('development (localhost:9092)'); await expect(await getCellByColumnName(table, 'Start Offset')).toHaveText('0'); await expect(await getCellByColumnName(table, 'Offset')).toHaveText('2'); await expect(await getCellByColumnName(table, 'Segments')).toHaveText('1'); @@ -189,7 +186,6 @@ test('Visit Kafka Order Service', async ({ page }) => { await expect(await getCellByColumnName(table, 'Name')).toHaveText('order-status-group-100'); await expect(await getCellByColumnName(table, 'State')).toHaveText('Stable'); await expect(await getCellByColumnName(table, 'Protocol')).toHaveText('RoundRobinAssigner'); - await expect(await getCellByColumnName(table, 'Coordinator')).toHaveText('localhost:9092'); await expect(await getCellByColumnName(table, 'Leader')).toHaveText(/^consumer-1/); await expect(await getCellByColumnName(table, 'Members')).toContainText(/^consumer-1/); await expect(await getCellByColumnName(table, 'Lag')).toHaveText('0'); diff --git a/webui/src/components/dashboard/kafka/KafkaClient.vue b/webui/src/components/dashboard/kafka/KafkaClient.vue index dd9e1bb28..5cfa479cf 100644 --- a/webui/src/components/dashboard/kafka/KafkaClient.vue +++ b/webui/src/components/dashboard/kafka/KafkaClient.vue @@ -3,7 +3,7 @@ import { getRouteName, useDashboard } from '@/composables/dashboard'; import { useKafka } from '@/composables/kafka'; import { useRoute, useRouter } from '@/router'; import { computed, type Ref } from 'vue'; -import Message from './Message.vue'; +import Message from '../../Message.vue'; const route = useRoute(); const router = useRouter(); @@ -80,6 +80,10 @@ function gotToMember(memberId: string, groupName: string, openInNewTab = false){

Address

{{ formatAddress(client.address) }}

+
+

Broker

+

{{ formatAddress(client.brokerAddress) }}

+

Client Software

{{ clientSoftware(client) }}

diff --git a/webui/src/components/dashboard/kafka/KafkaGroup.vue b/webui/src/components/dashboard/kafka/KafkaGroup.vue index 2633e6984..cf50ddd41 100644 --- a/webui/src/components/dashboard/kafka/KafkaGroup.vue +++ b/webui/src/components/dashboard/kafka/KafkaGroup.vue @@ -3,14 +3,14 @@ import { getRouteName, useDashboard } from '@/composables/dashboard'; import { usePrettyDates } from '@/composables/usePrettyDate'; import { useRoute, useRouter } from '@/router'; import { computed, type Ref } from 'vue'; -import Message from './Message.vue'; +import Message from '../../Message.vue'; import { useKafka } from '@/composables/kafka'; import { useMetrics } from '@/composables/metrics'; const route = useRoute(); const router = useRouter(); const { format } = usePrettyDates(); -const { clientSoftware } = useKafka(); +const { clientSoftware, formatAddress } = useKafka(); const { dashboard } = useDashboard(); const { value } = useMetrics(); @@ -23,7 +23,7 @@ const group = computed(() => { if (!service.value) { return null; } - for (let group of service.value?.groups){ + for (let group of service.value.groups){ if (group.name == groupName) { return group; } @@ -98,12 +98,8 @@ function goToMember(member: KafkaMember, openInNewTab = false){

{{ group.protocol }}

-

Generation

-

{{ group.generation }}

-
-
-

Coordinator

-

{{ group.coordinator }}

+

Generation

+

{{ group.generation }}

Last Rebalancing

@@ -144,7 +140,7 @@ function goToMember(member: KafkaMember, openInNewTab = false){ {{ member.name }} - {{ member.addr }} + {{ formatAddress(member.addr) }} {{ clientSoftware(member) }} {{ format(member.heartbeat) }} @@ -154,7 +150,7 @@ function goToMember(member: KafkaMember, openInNewTab = false){
-
+
\ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaGroupMember.vue b/webui/src/components/dashboard/kafka/KafkaGroupMember.vue index 857f3ba0c..e6e789aa9 100644 --- a/webui/src/components/dashboard/kafka/KafkaGroupMember.vue +++ b/webui/src/components/dashboard/kafka/KafkaGroupMember.vue @@ -4,7 +4,7 @@ import { useKafka } from '@/composables/kafka'; import { usePrettyDates } from '@/composables/usePrettyDate'; import { useRoute, useRouter } from '@/router'; import { computed, type Ref } from 'vue'; -import Message from './Message.vue'; +import Message from '../../Message.vue'; import { useMetrics } from '@/composables/metrics'; interface Partition { diff --git a/webui/src/components/dashboard/kafka/KafkaGroups.vue b/webui/src/components/dashboard/kafka/KafkaGroups.vue index 214a63353..d75fd2f60 100644 --- a/webui/src/components/dashboard/kafka/KafkaGroups.vue +++ b/webui/src/components/dashboard/kafka/KafkaGroups.vue @@ -15,7 +15,7 @@ const props = defineProps<{ const router = useRouter() const { format } = usePrettyDates() const { sum } = useMetrics() -const { clientSoftware } = useKafka(); +const { clientSoftware, formatAddress } = useKafka(); function memberInfo(member: KafkaMember): string { let addition = '' @@ -27,7 +27,7 @@ function memberInfo(member: KafkaMember): string { } return `

Address

-

${member.addr}

+

${formatAddress(member.addr)}

Client Software

${clientSoftware(member)}

Last Heartbeat

@@ -95,7 +95,6 @@ function goToGroup(group: KafkaGroup, openInNewTab = false){ Name State Protocol - Coordinator Leader Members Lag @@ -110,7 +109,6 @@ function goToGroup(group: KafkaGroup, openInNewTab = false){ {{ group.state }} - {{ group.leader }}
    diff --git a/webui/src/components/dashboard/kafka/KafkaPartition.vue b/webui/src/components/dashboard/kafka/KafkaPartition.vue index e8258f2fe..1ea7c7f59 100644 --- a/webui/src/components/dashboard/kafka/KafkaPartition.vue +++ b/webui/src/components/dashboard/kafka/KafkaPartition.vue @@ -11,7 +11,6 @@ defineProps<{ ID - Leader Start Offset Offset Segments @@ -20,7 +19,6 @@ defineProps<{ {{ partition.id }} - {{ partition.leader.name }} ({{ partition.leader.addr }}) {{ partition.startOffset }} {{ partition.offset }} {{ partition.segments }} diff --git a/webui/src/types/kafka.d.ts b/webui/src/types/kafka.d.ts index 149e3b59e..fc6619198 100644 --- a/webui/src/types/kafka.d.ts +++ b/webui/src/types/kafka.d.ts @@ -39,7 +39,6 @@ declare interface KafkaPartition { id: number; startOffset: number; offset: number; - leader: KafkaBroker; segments: number; } @@ -52,7 +51,6 @@ declare interface KafkaGroup { name: string; generation: number members: KafkaMember[]; - coordinator: string; leader: string; state: string; protocol: string; @@ -98,6 +96,7 @@ declare interface KafkaValue { declare interface KafkaClient { clientId: string address: string + brokerAddress: string clientSoftwareName: string; clientSoftwareVersion: string; groups: { From 3247494e1d3720bbf68289928793308a20842a15 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 20:41:57 +0100 Subject: [PATCH 07/85] fix remove dynamic doc entry --- docs/config.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/config.json b/docs/config.json index eaa35c51a..9f8589d3a 100644 --- a/docs/config.json +++ b/docs/config.json @@ -48,8 +48,7 @@ "Static": { "expanded": true, "items": { - "CLI Usage": "configuration/static/cli.md", - "CLI Flags": "configuration/static/mokapi.md" + "CLI Usage": "configuration/static/cli.md" } }, "Dynamic": { From bb2b50e1233421cb916b605c938b8e67b4aff083 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 22:36:19 +0100 Subject: [PATCH 08/85] fix responsiveness --- webui/src/assets/dashboard.css | 26 ++++---- .../components/dashboard/kafka/KafkaGroup.vue | 66 ++++++++++--------- webui/src/views/DashboardView.vue | 12 ++-- 3 files changed, 54 insertions(+), 50 deletions(-) diff --git a/webui/src/assets/dashboard.css b/webui/src/assets/dashboard.css index 4ef3a83e9..c4f7c4d3b 100644 --- a/webui/src/assets/dashboard.css +++ b/webui/src/assets/dashboard.css @@ -6,7 +6,15 @@ --datatable-border-color: rgb(223,223,223); --datatable-background-active: rgb(240, 240, 240); } - +.dashboard h1{ + margin-top: 0; +} +.dashboard-content { + margin-top: 4rem; +} +.dashboard-content.demo { + margin-top: 10rem; +} .dashboard .card-group .card { border-color: var(--color-border); background-color: var(--color-background-soft); @@ -90,8 +98,10 @@ .header-demo { position: fixed; top: 4rem; + left: 2rem; + right: 2rem; z-index: 10; - width: 100%; + padding-right: 2rem; padding-top: 15px; padding-bottom: 30px; background-color: var(--color-background); @@ -131,12 +141,6 @@ border-bottom: 4px solid; margin-bottom: -3px; } -.dashboard-content { - margin-top: 4rem; -} -.dashboard-content.demo { - margin-top: 10rem; -} .dashboard a { text-decoration: none; } @@ -178,10 +182,10 @@ } @media only screen and (max-width: 450px) { - .dashboard-tabs.demo { - top: 10.8rem; -} .dashboard-content.demo { margin-top: 10.8rem; } + .dashboard-tabs.demo { + margin-top: 0.8rem; + } } \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaGroup.vue b/webui/src/components/dashboard/kafka/KafkaGroup.vue index cf50ddd41..983fc1d89 100644 --- a/webui/src/components/dashboard/kafka/KafkaGroup.vue +++ b/webui/src/components/dashboard/kafka/KafkaGroup.vue @@ -113,39 +113,41 @@ function goToMember(member: KafkaMember, openInNewTab = false){

    Members

    - - - - - - - - +
    +
    - Group leader - NameAddressClient SoftwareHeartbeat
    + + + + + + + + + + + + + + + + - - - - - - - - - - -
    + Group leader + NameAddressClient SoftwareHeartbeat
    + + + + + {{ member.name }} + + {{ formatAddress(member.addr) }}{{ clientSoftware(member) }}{{ format(member.heartbeat) }}
    - - - - - {{ member.name }} - - {{ formatAddress(member.addr) }}{{ clientSoftware(member) }}{{ format(member.heartbeat) }}
    + + +
diff --git a/webui/src/views/DashboardView.vue b/webui/src/views/DashboardView.vue index 08976b10d..6cbcfc7f3 100644 --- a/webui/src/views/DashboardView.vue +++ b/webui/src/views/DashboardView.vue @@ -94,13 +94,11 @@ useMeta('Dashboard | mokapi.io', description, '')

Dashboard

-
-
-

Demo Dashboard

-

- Get a feel for the interface and explore recorded data. -

-
+
+

Demo Dashboard

+

+ Get a feel for the interface and explore recorded data. +

From 4ad8cb7242fd2fa0a13a5ae92c85a7aebf99af37 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 22:57:49 +0100 Subject: [PATCH 09/85] fix responsiveness --- webui/src/views/DashboardView.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/webui/src/views/DashboardView.vue b/webui/src/views/DashboardView.vue index 6cbcfc7f3..23dd3d7eb 100644 --- a/webui/src/views/DashboardView.vue +++ b/webui/src/views/DashboardView.vue @@ -94,7 +94,7 @@ useMeta('Dashboard | mokapi.io', description, '')

Dashboard

-
+

Demo Dashboard

Get a feel for the interface and explore recorded data. From 0845efde8148da4b9123aca20a0a673cdb5b4576 Mon Sep 17 00:00:00 2001 From: maesi Date: Sun, 25 Jan 2026 23:36:48 +0100 Subject: [PATCH 10/85] fix responsiveness --- .../dashboard/kafka/KafkaClient.vue | 6 +- .../dashboard/kafka/KafkaClients.vue | 4 -- .../components/dashboard/kafka/KafkaGroup.vue | 8 +-- .../dashboard/kafka/KafkaGroupMember.vue | 63 ++++++++++--------- 4 files changed, 39 insertions(+), 42 deletions(-) diff --git a/webui/src/components/dashboard/kafka/KafkaClient.vue b/webui/src/components/dashboard/kafka/KafkaClient.vue index 5cfa479cf..730300f85 100644 --- a/webui/src/components/dashboard/kafka/KafkaClient.vue +++ b/webui/src/components/dashboard/kafka/KafkaClient.vue @@ -76,15 +76,15 @@ function gotToMember(memberId: string, groupName: string, openInNewTab = false){

-
+

Address

{{ formatAddress(client.address) }}

-
+

Broker

{{ formatAddress(client.brokerAddress) }}

-
+

Client Software

{{ clientSoftware(client) }}

diff --git a/webui/src/components/dashboard/kafka/KafkaClients.vue b/webui/src/components/dashboard/kafka/KafkaClients.vue index 7faf6c774..603df38c8 100644 --- a/webui/src/components/dashboard/kafka/KafkaClients.vue +++ b/webui/src/components/dashboard/kafka/KafkaClients.vue @@ -1,8 +1,6 @@ @@ -11,7 +12,7 @@ defineProps<{

Recent Messages

- +
\ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/Message.vue b/webui/src/components/dashboard/kafka/Message.vue index 71d226f9e..96cf6b9e9 100644 --- a/webui/src/components/dashboard/kafka/Message.vue +++ b/webui/src/components/dashboard/kafka/Message.vue @@ -200,6 +200,40 @@ function isNumber(value: string): boolean {

Partition

{{ data.partition }}

+
+

Key Type

+

{{ message?.keyType ?? '-' }}

+
+
+

Time

+

{{ format(event.time) }}

+
+
+
+
+

Client

+

+ + {{ data.clientId }} + + + {{ data.clientId }} + + - +

+
+
+

Content Type

+

{{ message?.contentTypeTitle ?? '-' }}

+
+
+

Producer Id

{{ data.producerId }}

@@ -213,22 +247,6 @@ function isNumber(value: string): boolean {

{{ data.sequenceNumber }}

-
-
-

Content Type

-

{{ message?.contentTypeTitle ?? '-' }}

-
-
-

Key Type

-

{{ message?.keyType ?? '-' }}

-
-
-
-
-

Time

-

{{ format(event.time) }}

-
-
diff --git a/webui/src/types/kafka.d.ts b/webui/src/types/kafka.d.ts index fc6619198..e7138cc95 100644 --- a/webui/src/types/kafka.d.ts +++ b/webui/src/types/kafka.d.ts @@ -79,6 +79,8 @@ declare interface KafkaEventData { producerId: number producerEpoch: number sequenceNumber: number + clientId: string + script: string } declare interface KafkaHeader { [name: string]: KafkaHeaderValue } From 6019a8e050cd5821f470a0595981299bbcb2dd7d Mon Sep 17 00:00:00 2001 From: maesi Date: Mon, 26 Jan 2026 21:39:36 +0100 Subject: [PATCH 15/85] adjust test --- webui/e2e/dashboard-demo/kafka.spec.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/webui/e2e/dashboard-demo/kafka.spec.ts b/webui/e2e/dashboard-demo/kafka.spec.ts index d3f12fc6f..3bcde8448 100644 --- a/webui/e2e/dashboard-demo/kafka.spec.ts +++ b/webui/e2e/dashboard-demo/kafka.spec.ts @@ -182,11 +182,6 @@ test('Visit Kafka Order Service', async ({ page }) => { await expect(page.getByLabel('Key Type')).toHaveText('-'); await expect(page.getByLabel('Key Type')).not.toBeEmpty(); await expect(page.getByLabel('Client')).toHaveText('mokapi-script'); - - const value = page.getByRole('region', { name: 'Value' }); - await expect(value.getByLabel('Content Type')).toHaveText('application/json'); - await expect(value.getByLabel('Lines of Code')).toHaveText('8 lines'); - await expect(value.getByLabel('Size of Code')).toHaveText('234 B'); await test.step('Verify Producer Script', async () => { await page.getByLabel('Client').getByRole('link').click(); From 096912a491a5dd3c9478a8a735ad8d3e055b0ff1 Mon Sep 17 00:00:00 2001 From: maesi Date: Mon, 26 Jan 2026 21:54:44 +0100 Subject: [PATCH 16/85] fix test --- webui/e2e/dashboard-demo/kafka.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/webui/e2e/dashboard-demo/kafka.spec.ts b/webui/e2e/dashboard-demo/kafka.spec.ts index 3bcde8448..2a8d66a0b 100644 --- a/webui/e2e/dashboard-demo/kafka.spec.ts +++ b/webui/e2e/dashboard-demo/kafka.spec.ts @@ -185,7 +185,7 @@ test('Visit Kafka Order Service', async ({ page }) => { await test.step('Verify Producer Script', async () => { await page.getByLabel('Client').getByRole('link').click(); - await expect(page.getByLabel('URL')).toHaveText(/demo-configs\/kafka.ts$/); + await expect(page.getByLabel('URL')).toHaveText(/kafka.ts$/); await page.goBack(); }) From 7e4da42cb52b63d97b9d7c56c589b34b06939c37 Mon Sep 17 00:00:00 2001 From: maesi Date: Tue, 27 Jan 2026 10:15:55 +0100 Subject: [PATCH 17/85] change view of kafka cluster for better UX --- webui/e2e/Dashboard/kafka/cluster.spec.ts | 18 ++- webui/e2e/dashboard-demo/kafka.spec.ts | 84 ++++++----- webui/src/components/dashboard/ConfigCard.vue | 120 +-------------- webui/src/components/dashboard/Configs.vue | 96 ++++++++++++ .../dashboard/kafka/KafkaClients.vue | 60 ++++---- .../dashboard/kafka/KafkaGroupsCard.vue | 17 --- .../dashboard/kafka/KafkaService.vue | 138 ++++++++++++------ .../dashboard/kafka/KafkaTopics.vue | 77 ++++++++++ .../dashboard/kafka/KafkaTopicsCard.vue | 76 ---------- .../components/dashboard/kafka/Servers.vue | 66 ++++----- 10 files changed, 393 insertions(+), 359 deletions(-) create mode 100644 webui/src/components/dashboard/Configs.vue delete mode 100644 webui/src/components/dashboard/kafka/KafkaGroupsCard.vue create mode 100644 webui/src/components/dashboard/kafka/KafkaTopics.vue delete mode 100644 webui/src/components/dashboard/kafka/KafkaTopicsCard.vue diff --git a/webui/e2e/Dashboard/kafka/cluster.spec.ts b/webui/e2e/Dashboard/kafka/cluster.spec.ts index ed3b71508..d5dbfac42 100644 --- a/webui/e2e/Dashboard/kafka/cluster.spec.ts +++ b/webui/e2e/Dashboard/kafka/cluster.spec.ts @@ -27,7 +27,9 @@ test('Visit Kafka cluster "Kafka World"', async ({ page }) => { }) await test.step('Check broker section', async () => { - const brokers = useTable(page.getByRole('region', { name: "Brokers" }).getByRole('table', { name: 'Brokers' }), ['Name', 'Host', 'Description', 'Tags']) + await page.getByRole('tab', { name: 'Servers' }).click(); + + const brokers = useTable(page.getByRole('table', { name: 'Servers' }), ['Name', 'Host', 'Description', 'Tags']) const broker = brokers.getRow(1) await expect(broker.getCellByName('Name')).toHaveText(cluster.brokers[0].name) await expect(broker.getCellByName('Host')).toHaveText(cluster.brokers[0].url) @@ -36,7 +38,9 @@ test('Visit Kafka cluster "Kafka World"', async ({ page }) => { }) await test.step('Check topic section', async () => { - const table = page.getByRole('region', { name: "Topics" }).getByRole('table', { name: 'Topics' }) + await page.getByRole('tab', { name: 'Topics' }).click(); + + const table = page.getByRole('table', { name: 'Topics' }) await expect(table).toBeVisible() const topics = useKafkaTopics(table) await topics.testTopic(0, cluster.topics[0]) @@ -44,7 +48,9 @@ test('Visit Kafka cluster "Kafka World"', async ({ page }) => { }) await test.step('Check groups section', async () => { - const table = page.getByRole('region', { name: "Groups" }).getByRole('table', { name: 'Groups' }) + await page.getByRole('tab', { name: 'Groups' }).click(); + + const table = page.getByRole('table', { name: 'Groups' }) await expect(table).toBeVisible() const groups = useKafkaGroups(table) await groups.testGroup(0, cluster.groups[0]) @@ -52,7 +58,9 @@ test('Visit Kafka cluster "Kafka World"', async ({ page }) => { }) await test.step('Check config section', async () => { - const configs = useTable(page.getByRole('region', { name: "Configs" }).getByRole('table', { name: 'Configs' }), ['URL', 'Provider', 'Last Update']) + await page.getByRole('tab', { name: 'Configs' }).click(); + + const configs = useTable(page.getByRole('table', { name: 'Configs' }), ['URL', 'Provider', 'Last Update']) const config = configs.getRow(1) await expect(config.getCellByName('URL')).toHaveText('https://www.example.com/foo/bar/communication/service/asyncapi.json') await expect(config.getCellByName('Provider')).toHaveText('HTTP') @@ -70,6 +78,8 @@ test('Visit Kafka cluster config file', async ({ page, context }) => { await tabs.kafka.click() await page.getByRole('table', { name: 'Kafka Clusters' }).getByText(cluster.name).click() + + await page.getByRole('tab', { name: 'Configs' }).click(); await page.getByRole('table', { name: 'Configs' }).getByText('https://www.example.com/foo/bar/communication/service/asyncapi.json').click() await expect(page.getByLabel('URL')).toHaveText('https://www.example.com/foo/bar/communication/service/asyncapi.json') diff --git a/webui/e2e/dashboard-demo/kafka.spec.ts b/webui/e2e/dashboard-demo/kafka.spec.ts index 2a8d66a0b..16b1e977c 100644 --- a/webui/e2e/dashboard-demo/kafka.spec.ts +++ b/webui/e2e/dashboard-demo/kafka.spec.ts @@ -18,57 +18,57 @@ test('Visit Kafka Order Service', async ({ page }) => { }); - await test.step('Verify Brokers', async () => { + await test.step('Verify Servers', async () => { - await expect(page.getByRole('region', { name: 'Brokers' })).toBeVisible(); - const table = page.getByRole('table', { name: 'Brokers' }); - const rows = table.locator('tbody tr'); - await expect(rows).toHaveCount(1); - await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('development'); - await expect(await getCellByColumnName(table, 'Host', rows.nth(0))).toHaveText('localhost:9092'); - await expect(await getCellByColumnName(table, 'Description', rows.nth(0))).toHaveText('Local development Kafka broker.'); + await page.getByRole('tab', { name: 'Servers' }).click(); + const table = page.getByRole('table', { name: 'Servers' }); + const rows = table.locator('tbody tr'); + await expect(rows).toHaveCount(1); + await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('development'); + await expect(await getCellByColumnName(table, 'Host', rows.nth(0))).toHaveText('localhost:9092'); + await expect(await getCellByColumnName(table, 'Description', rows.nth(0))).toHaveText('Local development Kafka broker.'); }); await test.step('Verify Topics', async () => { - await expect(page.getByRole('region', { name: 'Topics' })).toBeVisible(); - const table = page.getByRole('table', { name: 'Topics' }); - const rows = table.locator('tbody tr'); - await expect(rows).toHaveCount(1); - await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('order-topic'); - await expect(await getCellByColumnName(table, 'Description', rows.nth(0))).toHaveText('The Kafka topic for order events.'); - await expect(await getCellByColumnName(table, 'Last Message', rows.nth(0))).not.toHaveText('-'); - await expect(await getCellByColumnName(table, 'Messages', rows.nth(0))).toHaveText('2'); + await page.getByRole('tab', { name: 'Topics' }).click(); + const table = page.getByRole('table', { name: 'Topics' }); + const rows = table.locator('tbody tr'); + await expect(rows).toHaveCount(1); + await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('order-topic'); + await expect(await getCellByColumnName(table, 'Description', rows.nth(0))).toHaveText('The Kafka topic for order events.'); + await expect(await getCellByColumnName(table, 'Last Message', rows.nth(0))).not.toHaveText('-'); + await expect(await getCellByColumnName(table, 'Messages', rows.nth(0))).toHaveText('2'); }); await test.step('Verify Groups', async () => { - await expect(page.getByRole('region', { name: 'Groups' })).toBeVisible(); - const table = page.getByRole('table', { name: 'Groups' }); - const rows = table.locator('tbody tr'); - await expect(rows).toHaveCount(1); - await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('order-status-group-100'); - await expect(await getCellByColumnName(table, 'State', rows.nth(0))).toHaveText('Stable'); - await expect(await getCellByColumnName(table, 'Protocol', rows.nth(0))).toHaveText('RoundRobinAssigner'); - await expect(await getCellByColumnName(table, 'Leader', rows.nth(0))).toHaveText(/^consumer-1/); - const members = await getCellByColumnName(table, 'Members', rows.nth(0)) - await expect(members).toHaveText(/^consumer-1/); - - await members.hover(); - const tooltip = page.getByRole('tooltip') - await expect(tooltip).toBeVisible(); - await expect(tooltip.getByLabel('Address')).not.toBeEmpty(); - await expect(tooltip.getByLabel('Client Software')).toHaveText('-'); - await expect(tooltip.getByLabel('Last Heartbeat')).not.toBeEmpty(); - await expect(tooltip.getByLabel('Topics')).toHaveText('order-topic'); - - await rows.nth(0).getByRole('cell').nth(0).click(); - await expect(page.getByLabel('Group Name')).toHaveText('order-status-group-100'); - await expect(page.getByLabel('State')).toHaveText('Stable'); - await expect(page.getByLabel('Protocol')).toHaveText('RoundRobinAssigner'); - await expect(page.getByLabel('Generation', { exact: true })).toHaveText('0'); + await page.getByRole('tab', { name: 'Groups' }).click(); + const table = page.getByRole('table', { name: 'Groups' }); + const rows = table.locator('tbody tr'); + await expect(rows).toHaveCount(1); + await expect(await getCellByColumnName(table, 'Name', rows.nth(0))).toHaveText('order-status-group-100'); + await expect(await getCellByColumnName(table, 'State', rows.nth(0))).toHaveText('Stable'); + await expect(await getCellByColumnName(table, 'Protocol', rows.nth(0))).toHaveText('RoundRobinAssigner'); + await expect(await getCellByColumnName(table, 'Leader', rows.nth(0))).toHaveText(/^consumer-1/); + const members = await getCellByColumnName(table, 'Members', rows.nth(0)) + await expect(members).toHaveText(/^consumer-1/); + + await members.hover(); + const tooltip = page.getByRole('tooltip') + await expect(tooltip).toBeVisible(); + await expect(tooltip.getByLabel('Address')).not.toBeEmpty(); + await expect(tooltip.getByLabel('Client Software')).toHaveText('-'); + await expect(tooltip.getByLabel('Last Heartbeat')).not.toBeEmpty(); + await expect(tooltip.getByLabel('Topics')).toHaveText('order-topic'); + + await rows.nth(0).getByRole('cell').nth(0).click(); + await expect(page.getByLabel('Group Name')).toHaveText('order-status-group-100'); + await expect(page.getByLabel('State')).toHaveText('Stable'); + await expect(page.getByLabel('Protocol')).toHaveText('RoundRobinAssigner'); + await expect(page.getByLabel('Generation', { exact: true })).toHaveText('0'); await test.step('Verify Members', async () => { @@ -111,9 +111,12 @@ test('Visit Kafka Order Service', async ({ page }) => { }); await test.step('Verify Configs', async () => { + + await page.getByRole('tab', { name: 'Configs' }).click(); const table = page.getByRole('table', { name: 'Configs' }); await expect(await getCellByColumnName(table, 'URL')).toContainText('/asyncapi.yaml'); await expect(await getCellByColumnName(table, 'Provider')).toHaveText('File'); + }); await test.step('Verify Recent Messages', async () => { @@ -133,6 +136,7 @@ test('Visit Kafka Order Service', async ({ page }) => { await test.step('Visit Kafka Topic', async () => { + await page.getByRole('tab', { name: 'Topics' }).click(); await page.getByRole('table', { name: 'Topics' }).getByText('order-topic').click(); await expect(page.getByLabel('Topic', { exact: true })).toHaveText('order-topic'); await expect(page.getByLabel('Cluster')).toHaveText('Kafka Order Service API'); diff --git a/webui/src/components/dashboard/ConfigCard.vue b/webui/src/components/dashboard/ConfigCard.vue index 058d87060..f7078d9b1 100644 --- a/webui/src/components/dashboard/ConfigCard.vue +++ b/webui/src/components/dashboard/ConfigCard.vue @@ -1,129 +1,21 @@ \ No newline at end of file diff --git a/webui/src/components/dashboard/Configs.vue b/webui/src/components/dashboard/Configs.vue new file mode 100644 index 000000000..f6c53a315 --- /dev/null +++ b/webui/src/components/dashboard/Configs.vue @@ -0,0 +1,96 @@ + + + \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaClients.vue b/webui/src/components/dashboard/kafka/KafkaClients.vue index 603df38c8..c79d2194e 100644 --- a/webui/src/components/dashboard/kafka/KafkaClients.vue +++ b/webui/src/components/dashboard/kafka/KafkaClients.vue @@ -22,7 +22,7 @@ const clients = computed(() => { }) }) -onMounted(()=> { +onMounted(() => { const elements = document.querySelectorAll('.has-popover') const popovers = [...elements].map(x => { new Popover(x, { @@ -35,7 +35,7 @@ onMounted(()=> { }) }) -function goToClient(client: KafkaClient, openInNewTab = false){ +function goToClient(client: KafkaClient, openInNewTab = false) { if (getSelection()?.toString()) { return } @@ -43,8 +43,8 @@ function goToClient(client: KafkaClient, openInNewTab = false){ const to = { name: getRouteName('kafkaClient').value, params: { - service: props.service.name, - clientId: client.clientId, + service: props.service.name, + clientId: client.clientId, } } if (openInNewTab) { @@ -57,32 +57,28 @@ function goToClient(client: KafkaClient, openInNewTab = false){ \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaGroupsCard.vue b/webui/src/components/dashboard/kafka/KafkaGroupsCard.vue deleted file mode 100644 index e95a10948..000000000 --- a/webui/src/components/dashboard/kafka/KafkaGroupsCard.vue +++ /dev/null @@ -1,17 +0,0 @@ - - - \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaService.vue b/webui/src/components/dashboard/kafka/KafkaService.vue index 6b2d36957..3cc3981fa 100644 --- a/webui/src/components/dashboard/kafka/KafkaService.vue +++ b/webui/src/components/dashboard/kafka/KafkaService.vue @@ -1,24 +1,27 @@ \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaTopics.vue b/webui/src/components/dashboard/kafka/KafkaTopics.vue new file mode 100644 index 000000000..65486cdc6 --- /dev/null +++ b/webui/src/components/dashboard/kafka/KafkaTopics.vue @@ -0,0 +1,77 @@ + + + \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaTopicsCard.vue b/webui/src/components/dashboard/kafka/KafkaTopicsCard.vue deleted file mode 100644 index d2dad5e0f..000000000 --- a/webui/src/components/dashboard/kafka/KafkaTopicsCard.vue +++ /dev/null @@ -1,76 +0,0 @@ - - - \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/Servers.vue b/webui/src/components/dashboard/kafka/Servers.vue index 819f68ed8..719eff19c 100644 --- a/webui/src/components/dashboard/kafka/Servers.vue +++ b/webui/src/components/dashboard/kafka/Servers.vue @@ -7,7 +7,7 @@ const props = defineProps<{ servers: KafkaServer[], }>() -onMounted(()=> { +onMounted(() => { const elements = document.querySelectorAll('.has-popover') const popovers = [...elements].map(x => { new Popover(x, { @@ -31,47 +31,45 @@ const servers = computed(() => { \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaTopics.vue b/webui/src/components/dashboard/kafka/KafkaTopics.vue index 65486cdc6..922cd4073 100644 --- a/webui/src/components/dashboard/kafka/KafkaTopics.vue +++ b/webui/src/components/dashboard/kafka/KafkaTopics.vue @@ -1,9 +1,11 @@ \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/TopicConfig.vue b/webui/src/components/dashboard/kafka/TopicConfig.vue index 43556dc0c..a86dd4db1 100644 --- a/webui/src/components/dashboard/kafka/TopicConfig.vue +++ b/webui/src/components/dashboard/kafka/TopicConfig.vue @@ -87,6 +87,9 @@ const source = computed(() => { }) function messages(topic: KafkaTopic): string[] { + if (!topic.messages) { + return []; + } return Object.keys(topic.messages).sort((a, b) => { return a.localeCompare(b) }) diff --git a/webui/src/types/kafka.d.ts b/webui/src/types/kafka.d.ts index e7138cc95..3dc2f87a3 100644 --- a/webui/src/types/kafka.d.ts +++ b/webui/src/types/kafka.d.ts @@ -9,10 +9,10 @@ declare interface KafkaServer { name: string; host: string; description: string; - tags: KafkaServerTag[] + tags: KafkaTag[] } -declare interface KafkaServerTag { +declare interface KafkaTag { name: string description: string } @@ -22,6 +22,7 @@ declare interface KafkaTopic { description: string; partitions: KafkaPartition[]; messages: { [messageId: string]: KafkaMessage } + tags: KafkaTag[] } declare interface KafkaMessage { From fac1c87b9cb2b7c9186fbef846800f0191ae2a8e Mon Sep 17 00:00:00 2001 From: maesi Date: Tue, 27 Jan 2026 16:43:18 +0100 Subject: [PATCH 19/85] add support of channel tags --- webui/e2e/Dashboard/kafka/cluster.spec.ts | 8 ++--- webui/e2e/Dashboard/kafka/cluster.ts | 37 +++++------------------ webui/src/composables/local-storage.ts | 30 ++++++++++++++++++ 3 files changed, 42 insertions(+), 33 deletions(-) create mode 100644 webui/src/composables/local-storage.ts diff --git a/webui/e2e/Dashboard/kafka/cluster.spec.ts b/webui/e2e/Dashboard/kafka/cluster.spec.ts index d5dbfac42..8d6dfebf0 100644 --- a/webui/e2e/Dashboard/kafka/cluster.spec.ts +++ b/webui/e2e/Dashboard/kafka/cluster.spec.ts @@ -43,10 +43,12 @@ test('Visit Kafka cluster "Kafka World"', async ({ page }) => { const table = page.getByRole('table', { name: 'Topics' }) await expect(table).toBeVisible() const topics = useKafkaTopics(table) - await topics.testTopic(0, cluster.topics[0]) - await topics.testTopic(0, cluster.topics[0]) + await topics.testTopic(1, cluster.topics[0]) + await topics.testTopic(2, cluster.topics[1]) }) + await useKafkaMessages(page).test(page.getByRole('region', { name: "Recent Messages" }).getByRole('table', { name: 'Recent Messages' })) + await test.step('Check groups section', async () => { await page.getByRole('tab', { name: 'Groups' }).click(); @@ -66,8 +68,6 @@ test('Visit Kafka cluster "Kafka World"', async ({ page }) => { await expect(config.getCellByName('Provider')).toHaveText('HTTP') await expect(config.getCellByName('Last Update')).toHaveText(formatDateTime('2023-02-15T08:49:25.482366+01:00')) }) - - await useKafkaMessages(page).test(page.getByRole('region', { name: "Recent Messages" }).getByRole('table', { name: 'Recent Messages' })) }) test('Visit Kafka cluster config file', async ({ page, context }) => { diff --git a/webui/e2e/Dashboard/kafka/cluster.ts b/webui/e2e/Dashboard/kafka/cluster.ts index 3b95a17c3..60ae6e487 100644 --- a/webui/e2e/Dashboard/kafka/cluster.ts +++ b/webui/e2e/Dashboard/kafka/cluster.ts @@ -57,8 +57,8 @@ export const cluster = { { name: 'mokapi.shop.userSignedUp', description: 'This channel contains a message per each user who signs up in our application.', - lastMessage: formatTimestamp(1652035690), - messages: '1', + lastMessage: '-', + messages: '0', partitions: [ { id: '0', @@ -97,38 +97,17 @@ export const cluster = { name: 'foo', state: 'Stable', protocol: 'Range', - leader: 'julie', - members: [ - { - name: 'julie', - address: '127.0.0.1:15001', - clientSoftware: 'mokapi 1.0', - lastHeartbeat: formatTimestamp(1654771269), - partitions: { 'mokapi.shop.products': [ 0, 1 ], 'mokapi.shop.userSignedUp': [ 0 ] } - }, - { - name: 'herman', - address: '127.0.0.1:15002', - clientSoftware: 'mokapi 1.0', - lastHeartbeat: formatTimestamp(1654872269), - partitions: { 'mokapi.shop.products': [ 2 ], 'mokapi.shop.userSignedUp': [ ] } - } - ], + generation: '-', + lastRebalancing: '-', + members: 2, }, { name: 'bar', state: 'Stable', protocol: 'Range', - leader: 'george', - members: [ - { - name: 'george', - address: '127.0.0.1:15003', - clientSoftware: 'mokapi 1.0', - lastHeartbeat: formatTimestamp(1654721269), - partitions: { 'mokapi.shop.userSignedUp': [ 0 ] } - }, - ], + generation: '-', + lastRebalancing: '-', + members: 1 } ] } \ No newline at end of file diff --git a/webui/src/composables/local-storage.ts b/webui/src/composables/local-storage.ts new file mode 100644 index 000000000..a91ce6f27 --- /dev/null +++ b/webui/src/composables/local-storage.ts @@ -0,0 +1,30 @@ +import { ref, watch, type Ref } from 'vue'; + +const store = new Map(); + +export function useLocalStorage(key: string, defaultValue: T): Ref { + if (store.has(key)) { + return store.get(key); + } + + const stored = localStorage.getItem(key); + const state = ref(stored ? JSON.parse(stored) : defaultValue) as Ref; + + watch( + state, + value => { + localStorage.setItem(key, JSON.stringify(value)); + }, + { deep: true } + ); + + // Cross-tab sync + window.addEventListener('storage', e => { + if (e.key === key && e.newValue) { + state.value = JSON.parse(e.newValue) as T; + } + }); + + store.set(key, state); + return state; +} \ No newline at end of file From efe2c93c5c06efe75e8a39d4bdaaa2c69bb1ea2d Mon Sep 17 00:00:00 2001 From: marle3003 Date: Tue, 27 Jan 2026 19:02:24 +0100 Subject: [PATCH 20/85] fix concurrent map read/write --- providers/asyncapi3/kafka/store/store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/asyncapi3/kafka/store/store.go b/providers/asyncapi3/kafka/store/store.go index b86ffd994..99afe5e5a 100644 --- a/providers/asyncapi3/kafka/store/store.go +++ b/providers/asyncapi3/kafka/store/store.go @@ -208,10 +208,9 @@ func (s *Store) ServeMessage(rw kafka.ResponseWriter, req *kafka.Request) { client := kafka.ClientFromContext(req.Context) if client != nil { + s.m.Lock() if _, ok := s.clients[client.ClientId]; !ok { - s.m.Lock() s.clients[client.ClientId] = client - s.m.Unlock() client.Close = func() { s.m.Lock() defer s.m.Unlock() @@ -219,6 +218,7 @@ func (s *Store) ServeMessage(rw kafka.ResponseWriter, req *kafka.Request) { delete(s.clients, client.ClientId) } } + s.m.Unlock() } switch req.Message.(type) { From fb53f62f7f850bfe554d227b60d9cf651a306fc0 Mon Sep 17 00:00:00 2001 From: maesi Date: Tue, 27 Jan 2026 21:02:31 +0100 Subject: [PATCH 21/85] improve tab click event improve dashboard to prevent the accidental loading of demo data --- .../dashboard/kafka/KafkaService.vue | 2 - .../components/dashboard/kafka/KafkaTopic.vue | 2 - webui/src/composables/dashboard.demo.ts | 452 +++++++++--------- webui/src/composables/dashboard.ts | 16 +- webui/src/views/DashboardView.vue | 5 +- 5 files changed, 242 insertions(+), 235 deletions(-) diff --git a/webui/src/components/dashboard/kafka/KafkaService.vue b/webui/src/components/dashboard/kafka/KafkaService.vue index 7dfeede99..f2e2b4c73 100644 --- a/webui/src/components/dashboard/kafka/KafkaService.vue +++ b/webui/src/components/dashboard/kafka/KafkaService.vue @@ -34,8 +34,6 @@ const activeTab = ref('tab-topics'); function setTab(tab: string) { router.replace({ - path: route.path, - query: route.query, hash: `#${tab}` }); } diff --git a/webui/src/components/dashboard/kafka/KafkaTopic.vue b/webui/src/components/dashboard/kafka/KafkaTopic.vue index 025c84b5f..3368f2bfe 100644 --- a/webui/src/components/dashboard/kafka/KafkaTopic.vue +++ b/webui/src/components/dashboard/kafka/KafkaTopic.vue @@ -29,8 +29,6 @@ const activeTab = ref('tab-messages'); function setTab(tab: string) { router.replace( { - path: route.path, - query: route.query, hash: `#${tab}` }); } diff --git a/webui/src/composables/dashboard.demo.ts b/webui/src/composables/dashboard.demo.ts index e82c40e07..b9ecba72a 100644 --- a/webui/src/composables/dashboard.demo.ts +++ b/webui/src/composables/dashboard.demo.ts @@ -3,269 +3,277 @@ import { transformPath, useFetch } from "./fetch"; import type { Dashboard, ExampleRequest, ExampleResult, MailboxMessagesResult, MailboxResult } from "@/types/dashboard"; import { usePrettyLanguage } from "./usePrettyLanguage"; -const { formatLanguage } = usePrettyLanguage(); +export function useDemoDashboard() { -const demo = ref(null); + const { formatLanguage } = usePrettyLanguage(); -const data = computed(async () => { - let res = await fetch(transformPath('/demo/dashboard.json')); - if (!res.ok) { - let text = await res.text() - throw new Error(res.statusText + ': ' + text) - } - return await res.json() -}) - -watchEffect(async () => { - demo.value = await data.value; -}) - -export const dashboard: Dashboard = { - - getAppInfo() { - let result = { - version: '0.0', - activeServices: [], - search: { enabled: false } - }; - if (demo.value) { - result = demo.value.info; + const demo = ref(null); + + const data = computed(async () => { + let res = await fetch(transformPath('/demo/dashboard.json')); + if (!res.ok) { + let text = await res.text() + throw new Error(res.statusText + ': ' + text) } + return await res.json() + }) + + watchEffect(async () => { + demo.value = await data.value; + }) + + const dashboard: Dashboard = { + + getAppInfo() { + let result = { + version: '0.0', + activeServices: [], + search: { enabled: false } + }; + if (demo.value) { + result = demo.value.info; + } - return { data: result, isLoading: false, error: '', close: () => {} }; - }, + return { data: result, isLoading: false, error: '', close: () => { } }; + }, - getServices(type) { - let result = [] - if (demo.value) { - const response = demo.value['services'] + getServices(type) { + let result = [] + if (demo.value) { + const response = demo.value['services'] - if (response) { - if (type) { - for (let service of response) { - if (service.type == type){ - result.push(service) + if (response) { + if (type) { + for (let service of response) { + if (service.type == type) { + result.push(service) + } } + } else { + result = response } - }else{ - result = response } } - } - - const services = ref([]) - services.value = result.sort(compareService) - return { services, close: () => {}} - }, - getService(name, type) { - let result = null; - if (demo.value) { - result = demo.value['service_'+name] - } + const services = ref([]) + services.value = result.sort(compareService) + return { services, close: () => { } } + }, - const service = ref(result) - const isLoading = ref(false) - return {service, isLoading, close: () => {}} - }, + getService(name, type) { + let result = null; + if (demo.value) { + result = demo.value['service_' + name] + } - getEvents(namespace: string, ...labels: Label[]) { - let events = null; - if (demo.value) { - events = demo.value['events'] - } + const service = ref(result) + const isLoading = ref(false) + return { service, isLoading, close: () => { } } + }, - const result = [] - for (const event of events) { - if (event.traits.namespace !== namespace) { - continue + getEvents(namespace: string, ...labels: Label[]) { + let events = null; + if (demo.value) { + events = demo.value['events'] } - let isValid = true - for (const label of labels) { - if (event.traits[label.name] !== label.value) { - isValid = false + + const result = [] + if (events) { + for (const event of events) { + if (event.traits.namespace !== namespace) { + continue + } + let isValid = true + for (const label of labels) { + if (event.traits[label.name] !== label.value) { + isValid = false + } + } + if (isValid) { + result.push(event) + } } } - if (isValid) { - result.push(event) - } - } - - return { events: ref(result), close: () => {} } - }, - - getEvent(id: string){ - let event = null; - if (demo.value) { - const events = demo.value['events']; - for (const e of events) { - if (e.id === id) { - event = e; + + return { events: ref(result), close: () => { } } + }, + + getEvent(id: string) { + let event = null; + if (demo.value) { + const events = demo.value['events']; + for (const e of events) { + if (e.id === id) { + event = e; + } } } - } - return {event: ref(event), isLoading: ref(false), close: () => {}} - }, + return { event: ref(event), isLoading: ref(false), close: () => { } } + }, - getMetrics(query) { - let metrics = [] - if (demo.value) { - metrics = demo.value['metrics'] - } - return { - data: metrics, - isLoading: false, - error: null, - close: () => {}, - refs: 1, - refresh: () => {}, - } - }, - - getExample(request: ExampleRequest) { - const response = useFetch('/api/schema/example', { - method: 'POST', - body: JSON.stringify({name: request.name, schema: request.schema.schema, format: request.schema.format, contentTypes: request.contentTypes}), - headers: {'Content-Type': 'application/json', 'Accept': 'application/json'}}, - false, false) - const res: ExampleResult = reactive({ - data: [], - next: () => response.refresh(), - error: null - }) - - watchEffect(() => { - if (response.isLoading) { - return + getMetrics(query) { + let metrics = [] + if (demo.value) { + metrics = demo.value['metrics'] } - if (response.error) { - res.error = response.error - res.data = [] - return + return { + data: metrics, + isLoading: false, + error: null, + close: () => { }, + refs: 1, + refresh: () => { }, } + }, + + getExample(request: ExampleRequest) { + const response = useFetch('/api/schema/example', { + method: 'POST', + body: JSON.stringify({ name: request.name, schema: request.schema.schema, format: request.schema.format, contentTypes: request.contentTypes }), + headers: { 'Content-Type': 'application/json', 'Accept': 'application/json' } + }, + false, false) + const res: ExampleResult = reactive({ + data: [], + next: () => response.refresh(), + error: null + }) + + watchEffect(() => { + if (response.isLoading) { + return + } + if (response.error) { + res.error = response.error + res.data = [] + return + } - res.data = response.data - for (const example of res.data) { - example.value = atob(example.value) - example.value = formatLanguage(example.value, example.contentType!) + res.data = response.data + for (const example of res.data) { + example.value = atob(example.value) + example.value = formatLanguage(example.value, example.contentType!) + } + }) + return res + }, + + getMailbox(service: string, mailbox: string): MailboxResult { + let mb = null; + if (demo.value) { + mb = demo.value[`mailbox_${mailbox}`]; } - }) - return res - }, - - getMailbox(service: string, mailbox: string): MailboxResult { - let mb = null; - if (demo.value) { - mb = demo.value[`mailbox_${mailbox}`]; - } - return {mailbox: ref(mb), isLoading: ref(false)} - }, - - getMailboxMessages(service: string, mailbox: string): MailboxMessagesResult { - const result = []; - if (demo.value) { - for (const mail of demo.value['mails']) { - if (mail.data.to.filter((x: any) => x.address === mailbox).length > 0) { - result.push({ - messageId: mail.data.messageId, - subject: mail.data.subject, - from: mail.data.from, - to: mail.data.to, - date: mail.data.date - }) + return { mailbox: ref(mb), isLoading: ref(false) } + }, + + getMailboxMessages(service: string, mailbox: string): MailboxMessagesResult { + const result = []; + if (demo.value) { + for (const mail of demo.value['mails']) { + if (mail.data.to.filter((x: any) => x.address === mailbox).length > 0) { + result.push({ + messageId: mail.data.messageId, + subject: mail.data.subject, + from: mail.data.from, + to: mail.data.to, + date: mail.data.date + }) + } } } - } - const messages = ref(result) - const isLoading = ref(true) - return { messages: messages, isLoading, close: () => {} } - }, + const messages = ref(result) + const isLoading = ref(true) + return { messages: messages, isLoading, close: () => { } } + }, - getMail(messageId: string) { - let mail = null; - if (demo.value) { - mail = demo.value['mails'].find((x: any) => x.data.messageId === messageId); - } + getMail(messageId: string) { + let mail = null; + if (demo.value) { + mail = demo.value['mails'].find((x: any) => x.data.messageId === messageId); + } - return { mail: ref(mail), isLoading: ref(false) } - }, + return { mail: ref(mail), isLoading: ref(false) } + }, - getAttachmentUrl(messageId: string, name: string): string { - const result = this.getMail(messageId) - if (!result.mail.value) { - return ''; - } - const mail = result.mail.value; - const attachment = mail.data.attachments.find((x: any) => x.name === name); - if (!attachment) { - return ''; - } - return transformPath(`/demo/${getFilename(attachment.contentType)}`) - }, + getAttachmentUrl(messageId: string, name: string): string { + const result = this.getMail(messageId) + if (!result.mail.value) { + return ''; + } + const mail = result.mail.value; + const attachment = mail.data.attachments.find((x: any) => x.name === name); + if (!attachment) { + return ''; + } + return transformPath(`/demo/${getFilename(attachment.contentType)}`) + }, - getConfigs() { - let result = null; - if (demo.value) { - result = demo.value.configs; - } + getConfigs() { + let result = null; + if (demo.value) { + result = demo.value.configs; + } - return { data: ref(result), isLoading: ref(false), close: () => {} } - }, + return { data: ref(result), isLoading: ref(false), close: () => { } } + }, - getConfig(id: string) { - let result = null; - if (demo.value) { - result = demo.value.configs.find((x: Config) => x.id === id); - } + getConfig(id: string) { + let result = null; + if (demo.value) { + result = demo.value.configs.find((x: Config) => x.id === id); + } - return { config: ref(result), isLoading: ref(false), close: () => {} } - }, + return { config: ref(result), isLoading: ref(false), close: () => { } } + }, - getConfigData(id) { - let config = null; - if (demo.value) { - config = demo.value.configs.find((x: Config) => x.id === id); - } + getConfigData(id) { + let config = null; + if (demo.value) { + config = demo.value.configs.find((x: Config) => x.id === id); + } - const response = useFetch(this.getConfigDataUrl(id)); - const data = ref(null); - const isLoading = ref(true); - const filename = ref(); + const response = useFetch(this.getConfigDataUrl(id)); + const data = ref(null); + const isLoading = ref(true); + const filename = ref(); - watchEffect(() => { - data.value = response.data ? response.data : null; - isLoading.value = response.isLoading; - filename.value = getFilenameFromUrl(config.url); - }) + watchEffect(() => { + data.value = response.data ? response.data : null; + isLoading.value = response.isLoading; + filename.value = getFilenameFromUrl(config.url); + }) - return { data, isLoading, filename: filename, close: () => {} } - }, + return { data, isLoading, filename: filename, close: () => { } } + }, - getConfigDataUrl(id) { - let config = null; - if (demo.value) { - config = demo.value.configs.find((x: Config) => x.id === id); + getConfigDataUrl(id) { + let config = null; + if (demo.value) { + config = demo.value.configs.find((x: Config) => x.id === id); + } + return '/demo/' + getFilenameFromUrl(config.url) + }, + } + + function compareService(s1: Service, s2: Service) { + const name1 = s1.name.toLowerCase() + const name2 = s2.name.toLowerCase() + return name1.localeCompare(name2) + } + + function getFilename(contentType: string) { + const match = contentType.match(/name=([^;]+)/); + if (match && match[1]) { + return match[1]; } - return '/demo/' + getFilenameFromUrl(config.url) - }, -} - -function compareService(s1: Service, s2: Service) { - const name1 = s1.name.toLowerCase() - const name2 = s2.name.toLowerCase() - return name1.localeCompare(name2) -} - -function getFilename(contentType: string) { - const match = contentType.match(/name=([^;]+)/); - if (match && match[1]) { - return match[1]; + return null; + } + + function getFilenameFromUrl(url: string): string { + return new URL(url).pathname.split('/').pop()!; } - return null; -} -function getFilenameFromUrl(url: string): string { - return new URL(url).pathname.split('/').pop()!; + return dashboard } \ No newline at end of file diff --git a/webui/src/composables/dashboard.ts b/webui/src/composables/dashboard.ts index d6e31d94d..9adf62cf1 100644 --- a/webui/src/composables/dashboard.ts +++ b/webui/src/composables/dashboard.ts @@ -9,14 +9,16 @@ type Mode = 'live' | 'demo' const mode = ref('live') -export function useDashboard() { - const dashboard = computed(() => { - if (mode.value === 'live') { - return live.dashboard - } - return demo.dashboard - }) +const dashboard = computed(() => { + console.log(mode.value) + if (mode.value === 'live') { + return live.dashboard + } + return demo.useDemoDashboard() +}) +export function useDashboard() { + function setMode(m: Mode) { mode.value = m } diff --git a/webui/src/views/DashboardView.vue b/webui/src/views/DashboardView.vue index 23dd3d7eb..5293f43f3 100644 --- a/webui/src/views/DashboardView.vue +++ b/webui/src/views/DashboardView.vue @@ -49,7 +49,9 @@ const transitionRefresh = computed(() => { const { dashboard, setMode, getMode } = useDashboard(); const route = useRoute() -const appInfo = ref() +const appInfo = computed(() => { + return dashboard.value.getAppInfo() +}) onMounted(() => { if (route.meta.mode === 'live') { start(); @@ -57,7 +59,6 @@ onMounted(() => { if (route.meta.mode) { setMode(route.meta.mode as 'live' | 'demo') } - appInfo.value = dashboard.value.getAppInfo() onUnmounted(() => { appInfo.value?.close() }) From e63a5db02f74fcfbc4de6cb0573506c7499e0e61 Mon Sep 17 00:00:00 2001 From: maesi Date: Tue, 27 Jan 2026 23:36:15 +0100 Subject: [PATCH 22/85] add logging Kafka request JoinGroup and displaying in dashboard (WIP) --- api/handler_events_test.go | 2 +- examples/mokapi/kafka.js | 3 + .../asyncapi3/kafka/store/group_balancer.go | 28 +++++++ providers/asyncapi3/kafka/store/joingroup.go | 12 +++ providers/asyncapi3/kafka/store/log.go | 41 ++++++++-- providers/asyncapi3/kafka/store/log_test.go | 9 ++- providers/asyncapi3/kafka/store/partition.go | 2 +- .../asyncapi3/kafka/store/partition_test.go | 20 ++--- .../asyncapi3/kafka/store/produce_test.go | 24 +++--- providers/asyncapi3/kafka/store/store.go | 6 +- providers/asyncapi3/kafka/store/topic.go | 2 +- providers/asyncapi3/kafka/store/validation.go | 8 +- .../asyncapi3/kafka/store/validation_test.go | 26 +++---- .../dashboard/kafka/KafkaClient.vue | 38 +++++---- .../dashboard/kafka/KafkaMessages.vue | 16 +++- .../dashboard/kafka/KafkaMessagesCard.vue | 12 ++- .../dashboard/kafka/KafkaRequests.vue | 78 +++++++++++++++++++ .../dashboard/kafka/requests/JoinGroup.vue | 38 +++++++++ webui/src/types/kafka.d.ts | 20 ++++- 19 files changed, 311 insertions(+), 74 deletions(-) create mode 100644 webui/src/components/dashboard/kafka/KafkaRequests.vue create mode 100644 webui/src/components/dashboard/kafka/requests/JoinGroup.vue diff --git a/api/handler_events_test.go b/api/handler_events_test.go index 838555a0d..e05f8d85c 100644 --- a/api/handler_events_test.go +++ b/api/handler_events_test.go @@ -274,7 +274,7 @@ func TestHandler_KafkaEvents(t *testing.T) { fn: func(t *testing.T, h http.Handler, sm *events.StoreManager) { sm.SetStore(1, events.NewTraits().WithNamespace("kafka")) - err := sm.Push(&store.KafkaLog{ + err := sm.Push(&store.KafkaMessageLog{ Offset: 123, Key: store.LogValue{}, Message: store.LogValue{}, diff --git a/examples/mokapi/kafka.js b/examples/mokapi/kafka.js index 61d7f7070..8fd3685a8 100644 --- a/examples/mokapi/kafka.js +++ b/examples/mokapi/kafka.js @@ -261,6 +261,7 @@ export let events = [ traits: { namespace: 'kafka', name: 'Kafka World', + type: 'message', topic: 'mokapi.shop.products' }, time: '2023-02-13T09:49:25.482366+01:00', @@ -296,6 +297,7 @@ export let events = [ traits: { namespace: 'kafka', name: 'Kafka World', + type: 'message', topic: 'mokapi.shop.products' }, time: '2023-02-13T09:49:25.482366+01:00', @@ -331,6 +333,7 @@ export let events = [ traits: { namespace: 'kafka', name: 'Kafka World', + type: 'message', topic: 'mokapi.shop.avro' }, time: '2025-02-13T09:49:25.482366+01:00', diff --git a/providers/asyncapi3/kafka/store/group_balancer.go b/providers/asyncapi3/kafka/store/group_balancer.go index a0fc3271e..dc6b4d843 100644 --- a/providers/asyncapi3/kafka/store/group_balancer.go +++ b/providers/asyncapi3/kafka/store/group_balancer.go @@ -30,6 +30,7 @@ type joindata struct { protocols []joinGroup.Protocol rebalanceTimeout int sessionTimeout int + log func(data *KafkaRequestLog, clientId string) } type syncdata struct { @@ -232,6 +233,10 @@ StopWaitingForConsumers: ProtocolType: j.protocolType, ProtocolName: protocol, }) + go func() { + l := newKafkaJoinGroupLog(b.group.Name, j, memberId) + j.log(l, j.client.ClientId) + }() } go b.respond(leader.writer, &joinGroup.Response{ @@ -242,6 +247,10 @@ StopWaitingForConsumers: ProtocolName: protocol, Members: members, }) + go func() { + l := newKafkaJoinGroupLog(b.group.Name, leader, generation.LeaderId) + leader.log(l, leader.client.ClientId) + }() } func (b *groupBalancer) sendRebalanceInProgress(w kafka.ResponseWriter) { @@ -282,3 +291,22 @@ func newGroupAssignment(b []byte) *groupAssignment { return g } + +func newKafkaJoinGroupLog(name string, j joindata, memberId string) *KafkaRequestLog { + req := &KafkaJoinGroupRequest{ + KafkaRequestBase: KafkaRequestBase{ + RequestKey: kafka.JoinGroup, + RequestName: "JoinGroup", + }, + GroupName: name, + MemberId: memberId, + ProtocolType: j.protocolType, + } + for _, proto := range j.protocols { + req.Protocols = append(req.Protocols, proto.Name) + } + + return &KafkaRequestLog{ + Request: req, + } +} diff --git a/providers/asyncapi3/kafka/store/joingroup.go b/providers/asyncapi3/kafka/store/joingroup.go index 603265c91..2a2eac16d 100644 --- a/providers/asyncapi3/kafka/store/joingroup.go +++ b/providers/asyncapi3/kafka/store/joingroup.go @@ -3,6 +3,7 @@ package store import ( "mokapi/kafka" "mokapi/kafka/joinGroup" + "mokapi/runtime/events" ) func (s *Store) joingroup(rw kafka.ResponseWriter, req *kafka.Request) error { @@ -28,6 +29,7 @@ func (s *Store) joingroup(rw kafka.ResponseWriter, req *kafka.Request) error { protocols: r.Protocols, rebalanceTimeout: int(r.RebalanceTimeoutMs), sessionTimeout: int(r.SessionTimeoutMs), + log: s.logJoinGroupRequest, } // balancer writes the response @@ -35,3 +37,13 @@ func (s *Store) joingroup(rw kafka.ResponseWriter, req *kafka.Request) error { return nil } + +func (s *Store) logJoinGroupRequest(log *KafkaRequestLog, clientId string) { + log.Api = s.cluster + t := events.NewTraits(). + WithNamespace("kafka"). + WithName(s.cluster). + With("type", "request"). + With("clientId", clientId) + _ = s.eh.Push(log, t) +} diff --git a/providers/asyncapi3/kafka/store/log.go b/providers/asyncapi3/kafka/store/log.go index fcc544f01..6918064c5 100644 --- a/providers/asyncapi3/kafka/store/log.go +++ b/providers/asyncapi3/kafka/store/log.go @@ -1,13 +1,14 @@ package store import ( + "fmt" "mokapi/kafka" "mokapi/runtime/events" ) -type LogRecord func(log *KafkaLog, traits events.Traits) +type LogRecord func(log *KafkaMessageLog, traits events.Traits) -type KafkaLog struct { +type KafkaMessageLog struct { Offset int64 `json:"offset"` Key LogValue `json:"key"` Message LogValue `json:"message"` @@ -29,7 +30,7 @@ type LogValue struct { Binary []byte `json:"binary"` } -func (l *KafkaLog) Title() string { +func (l *KafkaMessageLog) Title() string { if l.Key.Value != "" { return l.Key.Value } else { @@ -37,8 +38,8 @@ func (l *KafkaLog) Title() string { } } -func newKafkaLog(record *kafka.Record) *KafkaLog { - return &KafkaLog{ +func newKafkaLog(record *kafka.Record) *KafkaMessageLog { + return &KafkaMessageLog{ Key: LogValue{Binary: kafka.Read(record.Key)}, Message: LogValue{Binary: kafka.Read(record.Value)}, Headers: convertHeader(record.Headers), @@ -47,3 +48,33 @@ func newKafkaLog(record *kafka.Record) *KafkaLog { SequenceNumber: record.SequenceNumber, } } + +type KafkaRequestData interface { + Title() string +} + +type KafkaRequestLog struct { + Api string `json:"api"` + Request KafkaRequestData `json:"request"` +} + +type KafkaRequestBase struct { + RequestKey kafka.ApiKey `json:"requestKey"` + RequestName string `json:"requestName"` +} + +type KafkaJoinGroupRequest struct { + KafkaRequestBase + GroupName string `json:"groupName"` + MemberId string `json:"memberId"` + ProtocolType string `json:"protocolType"` + Protocols []string `json:"protocols"` +} + +func (l *KafkaRequestLog) Title() string { + return l.Request.Title() +} + +func (r *KafkaJoinGroupRequest) Title() string { + return fmt.Sprintf("JoinGroup %s", r.GroupName) +} diff --git a/providers/asyncapi3/kafka/store/log_test.go b/providers/asyncapi3/kafka/store/log_test.go index 0a8133a89..3ce4ee878 100644 --- a/providers/asyncapi3/kafka/store/log_test.go +++ b/providers/asyncapi3/kafka/store/log_test.go @@ -1,18 +1,19 @@ package store_test import ( - "github.com/stretchr/testify/require" "mokapi/providers/asyncapi3/kafka/store" "testing" + + "github.com/stretchr/testify/require" ) func TestKafkaLog_Title(t *testing.T) { - v := store.KafkaLog{} + v := store.KafkaMessageLog{} require.Equal(t, "", v.Title()) - v = store.KafkaLog{Key: store.LogValue{Value: "foo"}} + v = store.KafkaMessageLog{Key: store.LogValue{Value: "foo"}} require.Equal(t, "foo", v.Title()) - v = store.KafkaLog{Key: store.LogValue{Binary: []byte("foo")}} + v = store.KafkaMessageLog{Key: store.LogValue{Binary: []byte("foo")}} require.Equal(t, "foo", v.Title()) } diff --git a/providers/asyncapi3/kafka/store/partition.go b/providers/asyncapi3/kafka/store/partition.go index 580f5db46..6391a1294 100644 --- a/providers/asyncapi3/kafka/store/partition.go +++ b/providers/asyncapi3/kafka/store/partition.go @@ -46,7 +46,7 @@ type Segment struct { type record struct { Data *kafka.Record - Log *KafkaLog + Log *KafkaMessageLog } type WriteOptions struct { diff --git a/providers/asyncapi3/kafka/store/partition_test.go b/providers/asyncapi3/kafka/store/partition_test.go index 840c63ef3..773be892f 100644 --- a/providers/asyncapi3/kafka/store/partition_test.go +++ b/providers/asyncapi3/kafka/store/partition_test.go @@ -16,7 +16,7 @@ func TestPartition(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) {}, + func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, ) @@ -31,7 +31,7 @@ func TestPartition_Write(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) { + func(log *KafkaMessageLog, traits events.Traits) { logs = append(logs, log.Offset) }, func(record *kafka.Record, schemaId int) bool { return false }, @@ -78,7 +78,7 @@ func TestPartition_Read_Empty(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) {}, + func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, ) @@ -91,7 +91,7 @@ func TestPartition_Read(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) {}, + func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, ) @@ -118,7 +118,7 @@ func TestPartition_Read_OutOfOffset_Empty(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) {}, + func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, ) @@ -131,7 +131,7 @@ func TestPartition_Read_OutOfOffset(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) {}, + func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, ) @@ -155,7 +155,7 @@ func TestPartition_Write_Value_Validator(t *testing.T) { p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, _ events.Traits) { + func(log *KafkaMessageLog, _ events.Traits) { }, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{Config: &asyncapi3.Channel{Bindings: asyncapi3.ChannelBindings{ Kafka: asyncapi3.TopicBindings{ValueSchemaValidation: true}, @@ -218,7 +218,7 @@ func TestPartition_Write_Value_Validator(t *testing.T) { func TestPatition_Retention(t *testing.T) { p := newPartition(0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) {}, + func(log *KafkaMessageLog, traits events.Traits) {}, func(record *kafka.Record, schemaId int) bool { return false }, &Topic{}, ) @@ -261,11 +261,11 @@ func TestPatition_Retention(t *testing.T) { } func TestPartition_Write_Producer_ClientId(t *testing.T) { - var logs []*KafkaLog + var logs []*KafkaMessageLog p := newPartition( 0, map[int]*Broker{1: {Id: 1}}, - func(log *KafkaLog, traits events.Traits) { + func(log *KafkaMessageLog, traits events.Traits) { logs = append(logs, log) }, func(record *kafka.Record, schemaId int) bool { return false }, diff --git a/providers/asyncapi3/kafka/store/produce_test.go b/providers/asyncapi3/kafka/store/produce_test.go index d22492fa9..7c7ff7ead 100644 --- a/providers/asyncapi3/kafka/store/produce_test.go +++ b/providers/asyncapi3/kafka/store/produce_test.go @@ -130,14 +130,14 @@ func TestProduce(t *testing.T) { logs := sm.GetEvents(events.NewTraits().WithNamespace("kafka").WithName("test").With("topic", "foo")) require.Len(t, logs, 2) - require.Equal(t, []byte("foo-2"), logs[0].Data.(*store.KafkaLog).Key.Binary) - require.Equal(t, []byte("bar-2"), logs[0].Data.(*store.KafkaLog).Message.Binary) - require.Equal(t, int64(1), logs[0].Data.(*store.KafkaLog).Offset) - require.Equal(t, "kafkatest", logs[0].Data.(*store.KafkaLog).ClientId) + require.Equal(t, []byte("foo-2"), logs[0].Data.(*store.KafkaMessageLog).Key.Binary) + require.Equal(t, []byte("bar-2"), logs[0].Data.(*store.KafkaMessageLog).Message.Binary) + require.Equal(t, int64(1), logs[0].Data.(*store.KafkaMessageLog).Offset) + require.Equal(t, "kafkatest", logs[0].Data.(*store.KafkaMessageLog).ClientId) require.Equal(t, "kafkatest", logs[0].Traits.Get("clientId")) - require.Equal(t, int64(0), logs[1].Data.(*store.KafkaLog).Offset) - require.Equal(t, "kafkatest", logs[1].Data.(*store.KafkaLog).ClientId) + require.Equal(t, int64(0), logs[1].Data.(*store.KafkaMessageLog).Offset) + require.Equal(t, "kafkatest", logs[1].Data.(*store.KafkaMessageLog).ClientId) }, }, { @@ -414,12 +414,12 @@ func TestProduce(t *testing.T) { logs := sm.GetEvents(events.NewTraits().WithNamespace("kafka").WithName("test").With("topic", "foo")) require.Len(t, logs, 1) - require.Equal(t, `"foo-1"`, string(logs[0].Data.(*store.KafkaLog).Key.Binary)) - require.Equal(t, "4", string(logs[0].Data.(*store.KafkaLog).Message.Binary)) - require.Equal(t, int64(0), logs[0].Data.(*store.KafkaLog).Offset) - require.Equal(t, int64(1), logs[0].Data.(*store.KafkaLog).ProducerId) - require.Equal(t, int16(0), logs[0].Data.(*store.KafkaLog).ProducerEpoch) - require.Equal(t, int32(0), logs[0].Data.(*store.KafkaLog).SequenceNumber) + require.Equal(t, `"foo-1"`, string(logs[0].Data.(*store.KafkaMessageLog).Key.Binary)) + require.Equal(t, "4", string(logs[0].Data.(*store.KafkaMessageLog).Message.Binary)) + require.Equal(t, int64(0), logs[0].Data.(*store.KafkaMessageLog).Offset) + require.Equal(t, int64(1), logs[0].Data.(*store.KafkaMessageLog).ProducerId) + require.Equal(t, int16(0), logs[0].Data.(*store.KafkaMessageLog).ProducerEpoch) + require.Equal(t, int32(0), logs[0].Data.(*store.KafkaMessageLog).SequenceNumber) }, }, { diff --git a/providers/asyncapi3/kafka/store/store.go b/providers/asyncapi3/kafka/store/store.go index 99afe5e5a..3ed221d91 100644 --- a/providers/asyncapi3/kafka/store/store.go +++ b/providers/asyncapi3/kafka/store/store.go @@ -327,9 +327,11 @@ func (s *Store) getBrokerByHost(addr string) *Broker { return nil } -func (s *Store) log(log *KafkaLog, traits events.Traits) { +func (s *Store) log(log *KafkaMessageLog, traits events.Traits) { log.Api = s.cluster - t := traits.WithNamespace("kafka").WithName(s.cluster) + t := traits.WithNamespace("kafka"). + WithName(s.cluster). + With("type", "message") if log.ClientId != "" { t = t.With("clientId", log.ClientId) } diff --git a/providers/asyncapi3/kafka/store/topic.go b/providers/asyncapi3/kafka/store/topic.go index 05bbaeeea..eebb3360d 100644 --- a/providers/asyncapi3/kafka/store/topic.go +++ b/providers/asyncapi3/kafka/store/topic.go @@ -73,7 +73,7 @@ func (t *Topic) update(config *asyncapi3.Channel, s *Store) { t.Partitions = t.Partitions[:numPartitions] } -func (t *Topic) log(r *KafkaLog, traits events.Traits) { +func (t *Topic) log(r *KafkaMessageLog, traits events.Traits) { t.logger(r, traits.With("topic", t.Name)) } diff --git a/providers/asyncapi3/kafka/store/validation.go b/providers/asyncapi3/kafka/store/validation.go index 8367f1b2d..327b23010 100644 --- a/providers/asyncapi3/kafka/store/validation.go +++ b/providers/asyncapi3/kafka/store/validation.go @@ -25,7 +25,7 @@ type validator struct { } type recordValidator interface { - Validate(record *kafka.Record) (*KafkaLog, error) + Validate(record *kafka.Record) (*KafkaMessageLog, error) } func newValidator(c *asyncapi3.Channel) *validator { @@ -41,7 +41,7 @@ func newValidator(c *asyncapi3.Channel) *validator { return v } -func (v *validator) Validate(record *kafka.Record) (l *KafkaLog, err error) { +func (v *validator) Validate(record *kafka.Record) (l *KafkaMessageLog, err error) { if v == nil { return newKafkaLog(record), nil } @@ -133,8 +133,8 @@ func newMessageValidator(messageId string, msg *asyncapi3.Message, channel *asyn return v } -func (mv *messageValidator) Validate(record *kafka.Record) (*KafkaLog, error) { - r := &KafkaLog{ +func (mv *messageValidator) Validate(record *kafka.Record) (*KafkaMessageLog, error) { + r := &KafkaMessageLog{ Key: LogValue{}, Message: LogValue{}, Headers: make(map[string]LogValue), diff --git a/providers/asyncapi3/kafka/store/validation_test.go b/providers/asyncapi3/kafka/store/validation_test.go index ea9e5db7d..875f42263 100644 --- a/providers/asyncapi3/kafka/store/validation_test.go +++ b/providers/asyncapi3/kafka/store/validation_test.go @@ -56,7 +56,7 @@ func TestValidation(t *testing.T) { require.Len(t, e, 2) // latest message is first require.Equal(t, - &store.KafkaLog{ + &store.KafkaMessageLog{ Offset: 1, Key: store.LogValue{Value: "", Binary: []byte("key-bar")}, Message: store.LogValue{Value: "", Binary: []byte("bar")}, @@ -67,9 +67,9 @@ func TestValidation(t *testing.T) { Deleted: false, Api: "test", }, - e[0].Data.(*store.KafkaLog)) + e[0].Data.(*store.KafkaMessageLog)) require.Equal(t, - &store.KafkaLog{ + &store.KafkaMessageLog{ Offset: 0, Key: store.LogValue{Value: "", Binary: []byte("key-foo")}, Message: store.LogValue{Value: "", Binary: []byte("foo")}, @@ -80,7 +80,7 @@ func TestValidation(t *testing.T) { Deleted: false, Api: "test", }, - e[1].Data.(*store.KafkaLog)) + e[1].Data.(*store.KafkaMessageLog)) }, }, { @@ -134,7 +134,7 @@ func TestValidation(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, []byte("foo"), e[0].Data.(*store.KafkaLog).Key.Binary) + require.Equal(t, []byte("foo"), e[0].Data.(*store.KafkaMessageLog).Key.Binary) }, }, { @@ -167,8 +167,8 @@ func TestValidation(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, `"12"`, e[0].Data.(*store.KafkaLog).Key.Value) - require.Equal(t, `"foo"`, e[0].Data.(*store.KafkaLog).Message.Value) + require.Equal(t, `"12"`, e[0].Data.(*store.KafkaMessageLog).Key.Value) + require.Equal(t, `"foo"`, e[0].Data.(*store.KafkaMessageLog).Message.Value) }, }, { @@ -199,7 +199,7 @@ func TestValidation(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, 1, e[0].Data.(*store.KafkaLog).SchemaId) + require.Equal(t, 1, e[0].Data.(*store.KafkaMessageLog).SchemaId) }, }, { @@ -254,7 +254,7 @@ func TestValidation(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, "123", e[0].Data.(*store.KafkaLog).Message.Value) + require.Equal(t, "123", e[0].Data.(*store.KafkaMessageLog).Message.Value) }, }, { @@ -330,7 +330,7 @@ func TestValidation_Header(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, "\u0001\u0000\u0000\u0000", e[0].Data.(*store.KafkaLog).Headers["foo"].Value) + require.Equal(t, "\u0001\u0000\u0000\u0000", e[0].Data.(*store.KafkaMessageLog).Headers["foo"].Value) }, }, { @@ -357,7 +357,7 @@ func TestValidation_Header(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, "1", e[0].Data.(*store.KafkaLog).Headers["foo"].Value) + require.Equal(t, "1", e[0].Data.(*store.KafkaMessageLog).Headers["foo"].Value) }, }, { @@ -381,7 +381,7 @@ func TestValidation_Header(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, []byte{1, 0, 0, 0}, e[0].Data.(*store.KafkaLog).Headers["foo"].Binary) + require.Equal(t, []byte{1, 0, 0, 0}, e[0].Data.(*store.KafkaMessageLog).Headers["foo"].Binary) }, }, { @@ -408,7 +408,7 @@ func TestValidation_Header(t *testing.T) { e := sm.GetEvents(events.NewTraits()) require.Len(t, e, 1) - require.Equal(t, "3.141629934310913", e[0].Data.(*store.KafkaLog).Headers["foo"].Value) + require.Equal(t, "3.141629934310913", e[0].Data.(*store.KafkaMessageLog).Headers["foo"].Value) }, }, } diff --git a/webui/src/components/dashboard/kafka/KafkaClient.vue b/webui/src/components/dashboard/kafka/KafkaClient.vue index c04483340..71f402fc7 100644 --- a/webui/src/components/dashboard/kafka/KafkaClient.vue +++ b/webui/src/components/dashboard/kafka/KafkaClient.vue @@ -2,9 +2,10 @@ import { getRouteName, useDashboard } from '@/composables/dashboard'; import { useKafka } from '@/composables/kafka'; import { useRoute, useRouter } from '@/router'; -import { computed, type Ref } from 'vue'; +import { computed } from 'vue'; import Message from '../../Message.vue'; import KafkaMessagesCard from './KafkaMessagesCard.vue' +import KafkaRequests from './KafkaRequests.vue' const route = useRoute(); const router = useRouter(); @@ -12,15 +13,21 @@ const { clientSoftware, formatAddress } = useKafka(); const { dashboard } = useDashboard(); const serviceName = route.params.service!.toString(); -const clientId = route.params.clientId?.toString(); +const clientId = route.params.clientId!.toString(); +const service = computed(() => { + const result = dashboard.value.getService(serviceName, 'kafka'); + if (!result.service.value) { + return { service: null, isLoading: result.isLoading } + } + return { data: result.service.value as KafkaService, isLoading: false } +}) + -const result = dashboard.value.getService(serviceName, 'kafka'); -const service = result.service as Ref const client = computed(() => { - if (!service.value) { + if (!service.value || !service.value.data) { return null; } - for (let client of service.value?.clients){ + for (let client of service.value.data.clients){ if (client.clientId == clientId) { return client; } @@ -35,7 +42,7 @@ function gotToMember(memberId: string, groupName: string, openInNewTab = false){ const to = { name: getRouteName('kafkaGroupMember').value, params: { - service: service.value?.name, + service: service.value.data?.name, group: groupName, member: memberId, }, @@ -50,7 +57,7 @@ function gotToMember(memberId: string, groupName: string, openInNewTab = false){ \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaMessages.vue b/webui/src/components/dashboard/kafka/KafkaMessages.vue index 4f21e15fd..fb8bafd99 100644 --- a/webui/src/components/dashboard/kafka/KafkaMessages.vue +++ b/webui/src/components/dashboard/kafka/KafkaMessages.vue @@ -14,9 +14,13 @@ const props = defineProps<{ clientId?: string }>() +const emit = defineEmits<{ + (e: "loaded", count: number): void +}>(); + const tags = useLocalStorage(`kafka-${props.service?.name}-tags`, ['__all']) const labels = computed(() => { - const result = []; + const result = [{ name: 'type', value: 'message' }]; if (props.service) { result.push({name: 'name', value: props.service.name}) } @@ -41,12 +45,16 @@ let tab: Tab const messages = computed(() => { const result = []; + emit("loaded", events.value.length); for (const event of events.value) { const data = eventData(event) if (!data){ continue } + console.log('tags:') + console.log(tags.value) + if (props.service && !props.topicName && !tags.value.includes('__all')) { const topic = props.service.topics.find(t => t.name === event.traits['topic']); if (!topic) { @@ -70,11 +78,11 @@ const messages = computed(() => { return result; }) -function eventData(event: ServiceEvent | null): KafkaEventData | null{ +function eventData(event: ServiceEvent | null): KafkaMessageData | null{ if (!event) { return null } - return event.data + return event.data as KafkaMessageData } function isAvro(event: ServiceEvent): boolean { const msg = getMessageConfig(event) @@ -259,7 +267,7 @@ function getContentType(msg: KafkaMessage): [string, boolean] { return [ msg.contentType, false ] } -function key(data: KafkaEventData | null): string { +function key(data: KafkaMessageData | null): string { if (!data) { return '' } diff --git a/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue b/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue index 03dd348c1..ed5abd4d5 100644 --- a/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue +++ b/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue @@ -1,18 +1,26 @@ \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/KafkaRequests.vue b/webui/src/components/dashboard/kafka/KafkaRequests.vue new file mode 100644 index 000000000..c37fde3fb --- /dev/null +++ b/webui/src/components/dashboard/kafka/KafkaRequests.vue @@ -0,0 +1,78 @@ + + + \ No newline at end of file diff --git a/webui/src/components/dashboard/kafka/requests/JoinGroup.vue b/webui/src/components/dashboard/kafka/requests/JoinGroup.vue new file mode 100644 index 000000000..95e8b3dc9 --- /dev/null +++ b/webui/src/components/dashboard/kafka/requests/JoinGroup.vue @@ -0,0 +1,38 @@ + + + + + \ No newline at end of file diff --git a/webui/src/types/kafka.d.ts b/webui/src/types/kafka.d.ts index 3dc2f87a3..8958edcea 100644 --- a/webui/src/types/kafka.d.ts +++ b/webui/src/types/kafka.d.ts @@ -68,7 +68,9 @@ declare interface KafkaMember { partitions: { [topicName: string]: number[] }; } -declare interface KafkaEventData { +declare type KafkaEventData = KafkaMessageData | KafkaRequestLog + +declare interface KafkaMessageData { offset: number; key: KafkaValue; message: KafkaValue; @@ -106,4 +108,20 @@ declare interface KafkaClient { memberId: string group: string }[] +} + +declare interface KafkaRequestLog { + request: KafkaRequest & KafkaJoinGroupRequest +} + +declare interface KafkaRequest { + requestKey: number + requestName: string +} + +declare interface KafkaJoinGroupRequest { + groupName: string + memberId: string + protocolType: string + protocols: string[] } \ No newline at end of file From f6c8daca38b5ff87bd2ffea00b1b8f0ccbcf8a4b Mon Sep 17 00:00:00 2001 From: maesi Date: Wed, 28 Jan 2026 07:26:59 +0100 Subject: [PATCH 23/85] fix displaying Kafka messages in client view --- .../src/components/dashboard/kafka/KafkaMessages.vue | 5 +---- .../components/dashboard/kafka/KafkaMessagesCard.vue | 12 +++++++++--- webui/src/composables/dashboard.ts | 1 - webui/src/views/DocsView.vue | 1 - 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/webui/src/components/dashboard/kafka/KafkaMessages.vue b/webui/src/components/dashboard/kafka/KafkaMessages.vue index fb8bafd99..dc7396ac5 100644 --- a/webui/src/components/dashboard/kafka/KafkaMessages.vue +++ b/webui/src/components/dashboard/kafka/KafkaMessages.vue @@ -52,10 +52,7 @@ const messages = computed(() => { continue } - console.log('tags:') - console.log(tags.value) - - if (props.service && !props.topicName && !tags.value.includes('__all')) { + if (props.service && !props.clientId && !props.topicName && !tags.value.includes('__all')) { const topic = props.service.topics.find(t => t.name === event.traits['topic']); if (!topic) { continue diff --git a/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue b/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue index ed5abd4d5..c782e161b 100644 --- a/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue +++ b/webui/src/components/dashboard/kafka/KafkaMessagesCard.vue @@ -1,8 +1,8 @@