From ac194760e611f2513eaca0a6c0c6397ebcf4791d Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Wed, 22 Oct 2025 10:39:19 -0700 Subject: [PATCH 1/2] guard --- server/internal/http/legacy/handler.go | 7 +++++++ server/internal/http/legacy/session.go | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/server/internal/http/legacy/handler.go b/server/internal/http/legacy/handler.go index bfd29f3a..07e35bde 100644 --- a/server/internal/http/legacy/handler.go +++ b/server/internal/http/legacy/handler.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/m1k1o/neko/server/internal/api" @@ -44,6 +45,7 @@ type LegacyHandler struct { bannedIPs map[string]struct{} sessionIPs map[string]string wsDialer *websocket.Dialer + mu sync.Mutex } func New(serverAddr string) *LegacyHandler { @@ -393,6 +395,8 @@ func (h *LegacyHandler) Route(r types.Router) { func (h *LegacyHandler) ban(sessionId string) error { // find session by id + h.mu.Lock() + defer h.mu.Unlock() ip, ok := h.sessionIPs[sessionId] if !ok { return fmt.Errorf("session not found") @@ -404,6 +408,9 @@ func (h *LegacyHandler) ban(sessionId string) error { func (h *LegacyHandler) isBanned(r *http.Request) bool { ip := getIp(r) + h.mu.Lock() + defer h.mu.Unlock() + _, ok := h.bannedIPs[ip] return ok } diff --git a/server/internal/http/legacy/session.go b/server/internal/http/legacy/session.go index 02f45ec0..473ff95e 100644 --- a/server/internal/http/legacy/session.go +++ b/server/internal/http/legacy/session.go @@ -181,6 +181,8 @@ func (s *session) create(username, password string) error { } s.id, s.ip = data.ID, getIp(s.r) + s.h.mu.Lock() + defer s.h.mu.Unlock() s.h.sessionIPs[s.id] = s.ip // save session ip by id s.token = data.Token s.name = data.Profile.Name @@ -204,5 +206,7 @@ func (s *session) destroy() { } // remove session id from ip map + s.h.mu.Lock() + defer s.h.mu.Unlock() delete(s.h.sessionIPs, s.id) } From c667c7488abb634d27a9de4e3cbe5acc4c4fc410 Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Wed, 22 Oct 2025 11:33:50 -0700 Subject: [PATCH 2/2] few other spots --- server/internal/capture/streamsink.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/internal/capture/streamsink.go b/server/internal/capture/streamsink.go index 0d0905d5..a9e5146e 100644 --- a/server/internal/capture/streamsink.go +++ b/server/internal/capture/streamsink.go @@ -143,6 +143,9 @@ func (manager *StreamSinkManagerCtx) ID() string { } func (manager *StreamSinkManagerCtx) Bitrate() uint64 { + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() + return manager.bitrate } @@ -151,7 +154,7 @@ func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec { } func (manager *StreamSinkManagerCtx) start() error { - if len(manager.listeners)+len(manager.listenersKf) == 0 { + if manager.ListenersCount() == 0 { err := manager.CreatePipeline() if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { return err @@ -164,7 +167,7 @@ func (manager *StreamSinkManagerCtx) start() error { } func (manager *StreamSinkManagerCtx) stop() { - if len(manager.listeners)+len(manager.listenersKf) == 0 { + if manager.ListenersCount() == 0 { manager.DestroyPipeline() manager.logger.Info().Msgf("last listener, stopping") } @@ -408,6 +411,8 @@ func (manager *StreamSinkManagerCtx) DestroyPipeline() { manager.pipelinesActive.Set(0) + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() manager.brBuckets = make(map[int]float64) manager.bitrate = 0 }