diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 5257c1c..674f53b 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -2,9 +2,9 @@ name: E2E Tests on: push: - branches: [ main, develop ] + branches: [main, develop] pull_request: - branches: [ main, develop ] + branches: [main, develop] jobs: e2e-tests: @@ -14,115 +14,115 @@ jobs: fail-fast: false matrix: project: [chromium, firefox, webkit, "Mobile Chrome", "Mobile Safari"] - + steps: - - uses: actions/checkout@v4 - - - name: Setup Rust - uses: dtolnay/rust-toolchain@stable - with: - targets: wasm32-unknown-unknown - components: rustfmt, clippy - - - name: Cache Cargo registry and git trees - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cargo-registry- - - - name: Cache Cargo binaries - uses: actions/cache@v4 - with: - path: ~/.cargo/bin/ - key: ${{ runner.os }}-cargo-bin-trunk-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cargo-bin-trunk- - ${{ runner.os }}-cargo-bin- - - - name: Cache target directory - uses: actions/cache@v4 - with: - path: target/ - key: ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}-${{ hashFiles('**/*.rs') }} - restore-keys: | - ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}- - ${{ runner.os }}-target- - - - name: Install Rust dependencies - run: | - cargo install trunk --locked || echo "trunk already installed" - cargo install cargo-watch --locked || echo "cargo-watch already installed" - - - name: Setup Node.js - uses: actions/setup-node@v4 - with: - node-version: '20' - cache: 'npm' - cache-dependency-path: 'tests/e2e/package-lock.json' - - - name: Cache global npm packages - uses: actions/cache@v4 - with: - path: ~/.npm - key: ${{ runner.os }}-npm-global-${{ hashFiles('**/package-lock.json') }} - restore-keys: | - ${{ runner.os }}-npm-global- - - - name: Cache frontend node_modules - uses: actions/cache@v4 - with: - path: crates/kiko-frontend/node_modules - key: ${{ runner.os }}-frontend-node-modules-tailwindcss - restore-keys: | - ${{ runner.os }}-frontend-node-modules- - - - name: Install Node.js dependencies - run: | - # Install tailwindcss globally and create local node_modules in frontend - npm install -g tailwindcss - cd crates/kiko-frontend - mkdir -p node_modules - npm install tailwindcss --no-save - cd ../../tests/e2e && npm install - - - name: Cache Playwright Browsers - uses: actions/cache@v4 - with: - path: ~/.cache/ms-playwright - key: ${{ runner.os }}-playwright-${{ hashFiles('tests/e2e/package-lock.json') }} - restore-keys: | - ${{ runner.os }}-playwright- - - - name: Install Playwright Browsers - run: cd tests/e2e && npx playwright install --with-deps - - - name: Build Rust project - run: | - cargo build --release - cd crates/kiko-frontend && RUSTFLAGS="-C target-feature=-nontrapping-fptoint" trunk build --release - - - name: Run E2E tests - run: cd tests/e2e && npx playwright test --project="${{ matrix.project }}" - env: - CI: true - - - name: Upload Playwright Report - uses: actions/upload-artifact@v4 - if: failure() - with: - name: playwright-report - path: tests/e2e/playwright-report/ - retention-days: 30 - - - name: Upload test screenshots - uses: actions/upload-artifact@v4 - if: failure() - with: - name: test-screenshots - path: tests/e2e/test-screenshots/ - retention-days: 30 \ No newline at end of file + - uses: actions/checkout@v4 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + components: rustfmt, clippy + + - name: Cache Cargo registry and git trees + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo binaries + uses: actions/cache@v4 + with: + path: ~/.cargo/bin/ + key: ${{ runner.os }}-cargo-bin-trunk-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-bin-trunk- + ${{ runner.os }}-cargo-bin- + + - name: Cache target directory + uses: actions/cache@v4 + with: + path: target/ + key: ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}-${{ hashFiles('**/*.rs') }} + restore-keys: | + ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}- + ${{ runner.os }}-target- + + - name: Install Rust dependencies + run: | + cargo install trunk --locked || echo "trunk already installed" + cargo install cargo-watch --locked || echo "cargo-watch already installed" + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "20" + cache: "npm" + cache-dependency-path: "tests/e2e/package-lock.json" + + - name: Cache global npm packages + uses: actions/cache@v4 + with: + path: ~/.npm + key: ${{ runner.os }}-npm-global-${{ hashFiles('**/package-lock.json') }} + restore-keys: | + ${{ runner.os }}-npm-global- + + - name: Cache frontend node_modules + uses: actions/cache@v4 + with: + path: crates/kiko-frontend/node_modules + key: ${{ runner.os }}-frontend-node-modules-tailwindcss + restore-keys: | + ${{ runner.os }}-frontend-node-modules- + + - name: Install Node.js dependencies + run: | + # Install tailwindcss globally and create local node_modules in frontend + npm install -g tailwindcss + cd crates/kiko-frontend + mkdir -p node_modules + npm install tailwindcss --no-save + cd ../../tests/e2e && npm install + + - name: Cache Playwright Browsers + uses: actions/cache@v4 + with: + path: ~/.cache/ms-playwright + key: ${{ runner.os }}-playwright-${{ hashFiles('tests/e2e/package-lock.json') }} + restore-keys: | + ${{ runner.os }}-playwright- + + - name: Install Playwright Browsers + run: cd tests/e2e && npx playwright install --with-deps + + - name: Build Rust project + run: | + cargo build --release + cd crates/kiko-frontend && RUSTFLAGS="-C target-feature=-nontrapping-fptoint" trunk build --release + + - name: Run E2E tests + run: cd tests/e2e && npx playwright test --project="${{ matrix.project }}" + env: + CI: true + + - name: Upload Playwright Report + uses: actions/upload-artifact@v4 + if: failure() + with: + name: playwright-report + path: tests/e2e/playwright-report/ + retention-days: 30 + + - name: Upload test screenshots + uses: actions/upload-artifact@v4 + if: failure() + with: + name: test-screenshots + path: tests/e2e/test-screenshots/ + retention-days: 30 diff --git a/crates/kiko-backend/benches/pubsub.rs b/crates/kiko-backend/benches/pubsub.rs index dbbda27..338d8dd 100644 --- a/crates/kiko-backend/benches/pubsub.rs +++ b/crates/kiko-backend/benches/pubsub.rs @@ -6,13 +6,13 @@ use kiko::{data::SessionMessage, id::SessionId}; use kiko_backend::messaging::PubSub; fn bench_pubsub_subscribe(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); c.bench_function("pubsub_subscribe_single", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) .iter(|| async { let session_id = SessionId::new(); - black_box(pubsub.subscribe(session_id).await) + black_box(pubsub.subscribe(session_id)) }); }); @@ -30,7 +30,7 @@ fn bench_pubsub_subscribe(c: &mut Criterion) { let pubsub = pubsub.clone(); async move { let session_id = SessionId::new(); - pubsub.subscribe(session_id).await + pubsub.subscribe(session_id) } }) .collect(); @@ -44,7 +44,7 @@ fn bench_pubsub_subscribe(c: &mut Criterion) { } fn bench_pubsub_publish(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); let session_id = SessionId::new(); let message = SessionMessage::CreateSession(kiko::data::CreateSession { @@ -55,13 +55,13 @@ fn bench_pubsub_publish(c: &mut Criterion) { // Setup subscription first let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); }); c.bench_function("pubsub_publish_single", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) .iter(|| async { - pubsub.publish(session_id.clone(), message.clone()).await; + pubsub.publish(session_id.clone(), message.clone()); }); }); @@ -75,7 +75,7 @@ fn bench_pubsub_publish(c: &mut Criterion) { b.to_async(tokio::runtime::Runtime::new().unwrap()) .iter(|| async { for _ in 0..message_count { - pubsub.publish(session_id.clone(), message.clone()).await; + pubsub.publish(session_id.clone(), message.clone()); } }); }, @@ -85,7 +85,7 @@ fn bench_pubsub_publish(c: &mut Criterion) { } fn bench_pubsub_consume_events(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); let session_id = SessionId::new(); let message = SessionMessage::CreateSession(kiko::data::CreateSession { @@ -96,13 +96,13 @@ fn bench_pubsub_consume_events(c: &mut Criterion) { // Setup subscription first let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - let _notifier = pubsub.subscribe(session_id.clone()).await; - pubsub.publish(session_id.clone(), message).await; + let _notifier = pubsub.subscribe(session_id.clone()); + pubsub.publish(session_id.clone(), message); }); c.bench_function("pubsub_get_event", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter(|| async { black_box(pubsub.get_event(&session_id).await) }); + .iter(|| async { black_box(pubsub.get_event(&session_id)) }); }); let mut group = c.benchmark_group("pubsub_consume_event_cycle"); @@ -119,8 +119,8 @@ fn bench_pubsub_consume_events(c: &mut Criterion) { name: format!("Benchmark Session {i}"), duration: Duration::from_secs(3600), }); - pubsub.publish(session_id.clone(), msg).await; - black_box(pubsub.consume_event(&session_id).await); + pubsub.publish(session_id.clone(), msg); + black_box(pubsub.consume_event(&session_id)); } }); }, @@ -130,7 +130,7 @@ fn bench_pubsub_consume_events(c: &mut Criterion) { } fn bench_pubsub_concurrent_operations(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); let mut group = c.benchmark_group("pubsub_concurrent_pub_sub"); for session_count in [10, 50, 100].iter() { @@ -146,7 +146,7 @@ fn bench_pubsub_concurrent_operations(c: &mut Criterion) { let pubsub = pubsub.clone(); async move { let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { @@ -154,8 +154,8 @@ fn bench_pubsub_concurrent_operations(c: &mut Criterion) { duration: Duration::from_secs(3600), }); - pubsub.publish(session_id.clone(), message).await; - pubsub.consume_event(&session_id).await + pubsub.publish(session_id.clone(), message); + pubsub.consume_event(&session_id) } }) .collect(); @@ -169,7 +169,7 @@ fn bench_pubsub_concurrent_operations(c: &mut Criterion) { } fn bench_pubsub_memory_efficiency(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); c.bench_function("pubsub_session_cleanup", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) @@ -177,16 +177,16 @@ fn bench_pubsub_memory_efficiency(c: &mut Criterion) { let session_ids: Vec = (0..100).map(|_| SessionId::new()).collect(); for session_id in &session_ids { - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Cleanup Test Session".to_string(), duration: Duration::from_secs(3600), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); } for session_id in &session_ids { - pubsub.cleanup_session(session_id).await; + pubsub.cleanup_session(session_id); } }); }); diff --git a/crates/kiko-backend/src/handlers/v1/websocket.rs b/crates/kiko-backend/src/handlers/v1/websocket.rs index 236cc03..5653524 100644 --- a/crates/kiko-backend/src/handlers/v1/websocket.rs +++ b/crates/kiko-backend/src/handlers/v1/websocket.rs @@ -87,7 +87,7 @@ async fn setup_subscription( conn_state.session_id = Some(session_id.clone()); // Subscribe to the session and get the notifier - let notifier = state.pub_sub.subscribe(session_id.clone()).await; + let notifier = state.pub_sub.subscribe(session_id.clone()); // Create a channel for sending messages to the WebSocket let (outbound_tx, rx) = mpsc::unbounded_channel::(); @@ -109,7 +109,7 @@ async fn setup_subscription( // Create the next notification listener BEFORE processing the current message notified = notifier.notified(); - if let Some(msg) = state.pub_sub.get_event(&session_id).await { + if let Some(msg) = state.pub_sub.get_event(&session_id) { match serde_json::to_string(&*msg) { Ok(json) => { log::debug!("Sending message to WebSocket: {}", json); @@ -178,7 +178,7 @@ async fn handle_join_session( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -235,7 +235,7 @@ async fn handle_remove_participant( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -278,7 +278,7 @@ async fn handle_point_session( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -314,7 +314,7 @@ async fn handle_set_topic( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -349,7 +349,7 @@ async fn handle_clear_points( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -383,7 +383,7 @@ async fn handle_toggle_hide_points( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -576,10 +576,7 @@ async fn cleanup_connection(conn_state: &mut ConnectionState, state: &Arc, pub sessions: SessionServiceInMemory, - pub pub_sub: PubSub, + pub pub_sub: PubSub, } diff --git a/crates/kiko-backend/src/messaging.rs b/crates/kiko-backend/src/messaging.rs index b8b59e2..eb871db 100644 --- a/crates/kiko-backend/src/messaging.rs +++ b/crates/kiko-backend/src/messaging.rs @@ -23,7 +23,7 @@ //! let session_id = SessionId::new(); //! //! // Subscribe to events for a session -//! let notifier = pubsub.subscribe(session_id.clone()).await; +//! let notifier = pubsub.subscribe(session_id.clone()); //! //! // Publish a message to the session //! let create_session = CreateSession { @@ -31,21 +31,29 @@ //! duration: Duration::from_secs(1800), //! }; //! let message = SessionMessage::CreateSession(create_session); -//! pubsub.publish(session_id.clone(), message).await; +//! pubsub.publish(session_id.clone(), message); //! //! // Wait for notification and consume the event //! notifier.notified().await; -//! if let Some(event) = pubsub.consume_event(&session_id).await { +//! if let Some(event) = pubsub.consume_event(&session_id) { //! // Process the event //! } //! # } //! ``` -use std::{collections::HashMap, sync::Arc}; +use std::{hash::Hash, sync::Arc}; use arc_swap::ArcSwap; -use kiko::{data::SessionMessage, id::SessionId}; -use tokio::sync::{Notify, RwLock}; +use dashmap::DashMap; +use tokio::sync::Notify; + +/// Trait bounds for session identifier types. +pub trait IdType: Clone + Eq + Hash + Send + Sync {} +impl IdType for T where T: Clone + Eq + Hash + Send + Sync {} + +/// Trait bounds for message types used in the PubSub system. +pub trait MessageType: Clone + Send + Sync {} +impl MessageType for T where T: Clone + Send + Sync {} /// Event identifier type for tracking message sequence. /// @@ -62,38 +70,47 @@ pub type EventId = u64; /// /// # Thread Safety /// -/// All methods are async and thread-safe. The implementation uses: -/// - [`RwLock`] for protecting the notifier and event hashmaps +/// All methods are thread-safe and most are now synchronous due to lock-free design. The implementation uses: +/// - [`DashMap`] for lock-free concurrent access to notifier and event hashmaps /// - [`ArcSwap`] for lock-free atomic message updates /// - [`Arc`] for efficient cross-task notifications /// /// # Performance Characteristics /// -/// - **Subscribe**: O(1) amortized (HashMap insert) -/// - **Publish**: O(1) for message update + O(log n) for notifier lookup -/// - **Get/Consume**: O(1) lock-free read + O(log n) for HashMap access +/// - **Subscribe**: O(1) amortized (DashMap insert) +/// - **Publish**: O(1) for message update + O(1) for lock-free notifier lookup +/// - **Get/Consume**: O(1) lock-free read + O(1) for lock-free DashMap access /// - **Cleanup**: O(1) for both notifier and event removal -pub struct PubSub { +pub struct PubSub +where + Id: IdType, + T: MessageType, +{ /// Per-session notification handles for waking up subscribers. - notifiers: RwLock>>, + notifiers: DashMap>, /// Per-session message storage using lock-free atomic swaps. - events: RwLock>>, + events: DashMap>, } -impl PubSub { +impl PubSub +where + Id: IdType, + T: MessageType, +{ /// Creates a new empty PubSub instance. /// /// # Examples /// /// ``` /// use kiko_backend::messaging::PubSub; + /// use kiko::{data::SessionMessage, id::SessionId}; /// - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// ``` pub fn new() -> Self { Self { - notifiers: RwLock::new(HashMap::new()), - events: RwLock::new(HashMap::new()), + notifiers: DashMap::new(), + events: DashMap::new(), } } @@ -123,14 +140,13 @@ impl PubSub { /// # async fn main() { /// let pubsub = Arc::new(PubSub::new()); /// let session_id = SessionId::new(); - /// let notifier = pubsub.subscribe(session_id).await; + /// let notifier = pubsub.subscribe(session_id); /// notifier.notified().await; /// # } /// ``` - pub async fn subscribe(&self, session_id: SessionId) -> Arc { - let mut notifiers = self.notifiers.write().await; - notifiers - .entry(session_id) + pub fn subscribe(&self, id: Id) -> Arc { + self.notifiers + .entry(id) .or_insert_with(|| Arc::new(Notify::new())) .clone() } @@ -161,7 +177,7 @@ impl PubSub { /// let session_id = SessionId::new(); /// /// // First subscribe to ensure message isn't discarded - /// let _notifier = pubsub.subscribe(session_id.clone()).await; + /// let _notifier = pubsub.subscribe(session_id.clone()); /// /// // Then publish a message /// let create_session = CreateSession { @@ -169,21 +185,18 @@ impl PubSub { /// duration: Duration::from_secs(1800), /// }; /// let message = SessionMessage::CreateSession(create_session); - /// pubsub.publish(session_id, message).await; + /// pubsub.publish(session_id, message); /// # } /// ``` - pub async fn publish(&self, session_id: SessionId, message: SessionMessage) { + pub fn publish(&self, id: Id, message: T) { // Get the notifier first - let notifier = self.notifiers.read().await.get(&session_id).cloned(); + let notifier = self.notifiers.get(&id).map(|entry| entry.clone()); if let Some(notifier) = notifier { - // Store the message and immediately drop the write lock - { - let mut events = self.events.write().await; - events.insert(session_id, ArcSwap::from_pointee(message)); - } // Write lock dropped here + // Store the message - no locks needed with DashMap + self.events.insert(id, ArcSwap::from_pointee(message)); - // Now notify waiters - they can immediately acquire read locks + // Now notify waiters notifier.notify_waiters(); } } @@ -206,20 +219,19 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); - /// if let Some(message) = pubsub.get_event(&session_id).await { + /// if let Some(message) = pubsub.get_event(&session_id) { /// // Process message, but it remains in the queue /// } /// # } /// ``` - pub async fn get_event(&self, session_id: &SessionId) -> Option> { - let events = self.events.read().await; - events.get(session_id).map(|arc_swap| arc_swap.load_full()) + pub fn get_event(&self, id: &Id) -> Option> { + self.events.get(id).map(|entry| entry.load_full()) } /// Consumes and removes the current message for a session. @@ -241,22 +253,21 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); - /// if let Some(message) = pubsub.consume_event(&session_id).await { + /// if let Some(message) = pubsub.consume_event(&session_id) { /// // Process message - it's no longer in the queue /// } /// # } /// ``` - pub async fn consume_event(&self, session_id: &SessionId) -> Option> { - let mut events = self.events.write().await; - events - .remove(session_id) - .map(|arc_swap| arc_swap.load_full()) + pub fn consume_event(&self, id: &Id) -> Option> { + self.events + .remove(id) + .map(|(_, arc_swap)| arc_swap.load_full()) } /// Completely removes all data associated with a session. @@ -273,20 +284,18 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); - /// pubsub.cleanup_session(&session_id).await; + /// pubsub.cleanup_session(&session_id); /// # } /// ``` - pub async fn cleanup_session(&self, session_id: &SessionId) { - let mut events = self.events.write().await; - let mut notifiers = self.notifiers.write().await; - events.remove(session_id); - notifiers.remove(session_id); + pub fn cleanup_session(&self, id: &Id) { + self.events.remove(id); + self.notifiers.remove(id); } /// Returns the number of active sessions with subscribers. @@ -302,19 +311,19 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); - /// assert_eq!(pubsub.session_count().await, 0); + /// let pubsub: PubSub = PubSub::new(); + /// assert_eq!(pubsub.session_count(), 0); /// let session_id = SessionId::new(); - /// let _notifier = pubsub.subscribe(session_id).await; - /// assert_eq!(pubsub.session_count().await, 1); + /// let _notifier = pubsub.subscribe(session_id); + /// assert_eq!(pubsub.session_count(), 1); /// # } /// ``` - pub async fn session_count(&self) -> usize { - self.notifiers.read().await.len() + pub fn session_count(&self) -> usize { + self.notifiers.len() } /// Checks if a session has a pending message without retrieving it. @@ -335,25 +344,28 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); - /// if pubsub.has_event(&session_id).await { - /// let message = pubsub.consume_event(&session_id).await; + /// if pubsub.has_event(&session_id) { + /// let message = pubsub.consume_event(&session_id); /// // Process the message /// } /// # } /// ``` - pub async fn has_event(&self, session_id: &SessionId) -> bool { - let events = self.events.read().await; - events.contains_key(session_id) + pub fn has_event(&self, id: &Id) -> bool { + self.events.contains_key(id) } } -impl Default for PubSub { +impl Default for PubSub +where + Id: IdType, + T: MessageType, +{ /// Creates a default PubSub instance. /// /// Equivalent to calling [`PubSub::new()`]. @@ -372,93 +384,93 @@ mod tests { #[tokio::test] async fn consume_event_removes_message() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Test Message".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // First consume should return the message - let event = pubsub.consume_event(&session_id).await; + let event = pubsub.consume_event(&session_id); assert!(event.is_some()); // Second consume should return None (message was removed) - let event2 = pubsub.consume_event(&session_id).await; + let event2 = pubsub.consume_event(&session_id); assert!(event2.is_none()); // get_event should also return None now - let event3 = pubsub.get_event(&session_id).await; + let event3 = pubsub.get_event(&session_id); assert!(event3.is_none()); } #[tokio::test] async fn get_event_vs_consume_event() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Persistent Test".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // get_event should not remove the message - let event1 = pubsub.get_event(&session_id).await; + let event1 = pubsub.get_event(&session_id); assert!(event1.is_some()); - let event2 = pubsub.get_event(&session_id).await; + let event2 = pubsub.get_event(&session_id); assert!(event2.is_some()); // consume_event should remove it - let event3 = pubsub.consume_event(&session_id).await; + let event3 = pubsub.consume_event(&session_id); assert!(event3.is_some()); // Now it should be gone - let event4 = pubsub.get_event(&session_id).await; + let event4 = pubsub.get_event(&session_id); assert!(event4.is_none()); } #[tokio::test] async fn cleanup_session() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Cleanup Test".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // Verify session exists - assert_eq!(pubsub.session_count().await, 1); - assert!(pubsub.has_event(&session_id).await); + assert_eq!(pubsub.session_count(), 1); + assert!(pubsub.has_event(&session_id)); // Cleanup session - pubsub.cleanup_session(&session_id).await; + pubsub.cleanup_session(&session_id); // Verify session is gone - assert_eq!(pubsub.session_count().await, 0); - assert!(!pubsub.has_event(&session_id).await); + assert_eq!(pubsub.session_count(), 0); + assert!(!pubsub.has_event(&session_id)); - let event = pubsub.get_event(&session_id).await; + let event = pubsub.get_event(&session_id); assert!(event.is_none()); } #[tokio::test] async fn memory_leak_prevention() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let mut session_ids = Vec::new(); // Create many sessions @@ -466,64 +478,64 @@ mod tests { let session_id = SessionId::new(); session_ids.push(session_id.clone()); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: format!("Session {i}"), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id, message).await; + pubsub.publish(session_id, message); } - assert_eq!(pubsub.session_count().await, 100); + assert_eq!(pubsub.session_count(), 100); // Consume all events for session_id in &session_ids { - let event = pubsub.consume_event(session_id).await; + let event = pubsub.consume_event(session_id); assert!(event.is_some()); } // Events should be removed, but notifiers should still exist - assert_eq!(pubsub.session_count().await, 100); + assert_eq!(pubsub.session_count(), 100); // Clean up all sessions for session_id in &session_ids { - pubsub.cleanup_session(session_id).await; + pubsub.cleanup_session(session_id); } - assert_eq!(pubsub.session_count().await, 0); + assert_eq!(pubsub.session_count(), 0); } #[tokio::test] async fn has_event_utility() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); // No event initially - assert!(!pubsub.has_event(&session_id).await); + assert!(!pubsub.has_event(&session_id)); // Publish message let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Event Check".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // Should have event now - assert!(pubsub.has_event(&session_id).await); + assert!(pubsub.has_event(&session_id)); // Consume event - let _event = pubsub.consume_event(&session_id).await; + let _event = pubsub.consume_event(&session_id); // Should not have event anymore - assert!(!pubsub.has_event(&session_id).await); + assert!(!pubsub.has_event(&session_id)); } #[tokio::test] async fn correct_pubsub_behavior_subscribe_after_publish() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); // Publish first (no subscribers yet) - this is correct behavior @@ -532,14 +544,14 @@ mod tests { duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // Then subscribe - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); // Message should not be available since we subscribed after publishing // This is CORRECT pub/sub behavior - let event = pubsub.get_event(&session_id).await; + let event = pubsub.get_event(&session_id); assert!( event.is_none(), "Should not receive messages published before subscription" @@ -551,7 +563,7 @@ mod tests { let pubsub = Arc::new(PubSub::new()); let session_id = SessionId::new(); - let notifier = pubsub.subscribe(session_id.clone()).await; + let notifier = pubsub.subscribe(session_id.clone()); let pubsub_clone = pubsub.clone(); let session_id_clone = session_id.clone(); @@ -563,9 +575,7 @@ mod tests { session_id: session_id_clone.to_string(), participant_id: format!("participant_{i}"), }); - pubsub_clone - .publish(session_id_clone.clone(), message) - .await; + pubsub_clone.publish(session_id_clone.clone(), message); sleep(Duration::from_millis(50)).await; } }); @@ -580,7 +590,7 @@ mod tests { timeout(Duration::from_millis(200), notifier.notified()).await; if notification_result.is_ok() { // Process the event - if let Some(event) = pubsub.consume_event(&session_id).await { + if let Some(event) = pubsub.consume_event(&session_id) { match event.as_ref() { SessionMessage::RemoveParticipant(remove_participant) => { println!( @@ -601,25 +611,25 @@ mod tests { assert_eq!(processed_count, 3); // All events should be consumed - assert!(!pubsub.has_event(&session_id).await); + assert!(!pubsub.has_event(&session_id)); } // Keep your original working tests too #[tokio::test] async fn multiple_subscribers() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); - let _notifier1 = pubsub.subscribe(session_id.clone()).await; - let _notifier2 = pubsub.subscribe(session_id.clone()).await; + let _notifier1 = pubsub.subscribe(session_id.clone()); + let _notifier2 = pubsub.subscribe(session_id.clone()); let message = SessionMessage::AddParticipant(kiko::data::AddParticipant { session_id: session_id.clone().to_string(), participant_name: "participant1".to_string(), }); - pubsub.publish(session_id.clone(), message.clone()).await; - let event1 = pubsub.get_event(&session_id).await; - let event2 = pubsub.get_event(&session_id).await; + pubsub.publish(session_id.clone(), message.clone()); + let event1 = pubsub.get_event(&session_id); + let event2 = pubsub.get_event(&session_id); assert!(event1.is_some()); assert!(event2.is_some());