From 75ea5862ca4c4596d076435ba1302b6cb3789ad2 Mon Sep 17 00:00:00 2001 From: Zack Kollar Date: Thu, 25 Sep 2025 03:16:21 -0400 Subject: [PATCH 1/3] :hammer: Optimize concurrent write/reads for PubSub. - Fix benchmarks - Remove e2e test from CI/CD since they're broken for now, too much heavy lifting for non commerical GH actions --- .github/workflows/e2e-tests.yml | 226 +++++++++--------- crates/kiko-backend/benches/pubsub.rs | 32 +-- .../kiko-backend/src/handlers/v1/websocket.rs | 21 +- crates/kiko-backend/src/messaging.rs | 189 +++++++-------- 4 files changed, 228 insertions(+), 240 deletions(-) diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 5257c1c..52e2eb5 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -2,9 +2,9 @@ name: E2E Tests on: push: - branches: [ main, develop ] + branches: [] # [ main, develop ] # disable for now, this is broken pull_request: - branches: [ main, develop ] + branches: [] # [ main, develop ] # disable for now, this is broken jobs: e2e-tests: @@ -14,115 +14,115 @@ jobs: fail-fast: false matrix: project: [chromium, firefox, webkit, "Mobile Chrome", "Mobile Safari"] - + steps: - - uses: actions/checkout@v4 - - - name: Setup Rust - uses: dtolnay/rust-toolchain@stable - with: - targets: wasm32-unknown-unknown - components: rustfmt, clippy - - - name: Cache Cargo registry and git trees - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cargo-registry- - - - name: Cache Cargo binaries - uses: actions/cache@v4 - with: - path: ~/.cargo/bin/ - key: ${{ runner.os }}-cargo-bin-trunk-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cargo-bin-trunk- - ${{ runner.os }}-cargo-bin- - - - name: Cache target directory - uses: actions/cache@v4 - with: - path: target/ - key: ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}-${{ hashFiles('**/*.rs') }} - restore-keys: | - ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}- - ${{ runner.os }}-target- - - - name: Install Rust dependencies - run: | - cargo install trunk --locked || echo "trunk already installed" - cargo install cargo-watch --locked || echo "cargo-watch already installed" - - - name: Setup Node.js - uses: actions/setup-node@v4 - with: - node-version: '20' - cache: 'npm' - cache-dependency-path: 'tests/e2e/package-lock.json' - - - name: Cache global npm packages - uses: actions/cache@v4 - with: - path: ~/.npm - key: ${{ runner.os }}-npm-global-${{ hashFiles('**/package-lock.json') }} - restore-keys: | - ${{ runner.os }}-npm-global- - - - name: Cache frontend node_modules - uses: actions/cache@v4 - with: - path: crates/kiko-frontend/node_modules - key: ${{ runner.os }}-frontend-node-modules-tailwindcss - restore-keys: | - ${{ runner.os }}-frontend-node-modules- - - - name: Install Node.js dependencies - run: | - # Install tailwindcss globally and create local node_modules in frontend - npm install -g tailwindcss - cd crates/kiko-frontend - mkdir -p node_modules - npm install tailwindcss --no-save - cd ../../tests/e2e && npm install - - - name: Cache Playwright Browsers - uses: actions/cache@v4 - with: - path: ~/.cache/ms-playwright - key: ${{ runner.os }}-playwright-${{ hashFiles('tests/e2e/package-lock.json') }} - restore-keys: | - ${{ runner.os }}-playwright- - - - name: Install Playwright Browsers - run: cd tests/e2e && npx playwright install --with-deps - - - name: Build Rust project - run: | - cargo build --release - cd crates/kiko-frontend && RUSTFLAGS="-C target-feature=-nontrapping-fptoint" trunk build --release - - - name: Run E2E tests - run: cd tests/e2e && npx playwright test --project="${{ matrix.project }}" - env: - CI: true - - - name: Upload Playwright Report - uses: actions/upload-artifact@v4 - if: failure() - with: - name: playwright-report - path: tests/e2e/playwright-report/ - retention-days: 30 - - - name: Upload test screenshots - uses: actions/upload-artifact@v4 - if: failure() - with: - name: test-screenshots - path: tests/e2e/test-screenshots/ - retention-days: 30 \ No newline at end of file + - uses: actions/checkout@v4 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + components: rustfmt, clippy + + - name: Cache Cargo registry and git trees + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo binaries + uses: actions/cache@v4 + with: + path: ~/.cargo/bin/ + key: ${{ runner.os }}-cargo-bin-trunk-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-bin-trunk- + ${{ runner.os }}-cargo-bin- + + - name: Cache target directory + uses: actions/cache@v4 + with: + path: target/ + key: ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}-${{ hashFiles('**/*.rs') }} + restore-keys: | + ${{ runner.os }}-target-${{ hashFiles('**/Cargo.lock') }}- + ${{ runner.os }}-target- + + - name: Install Rust dependencies + run: | + cargo install trunk --locked || echo "trunk already installed" + cargo install cargo-watch --locked || echo "cargo-watch already installed" + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "20" + cache: "npm" + cache-dependency-path: "tests/e2e/package-lock.json" + + - name: Cache global npm packages + uses: actions/cache@v4 + with: + path: ~/.npm + key: ${{ runner.os }}-npm-global-${{ hashFiles('**/package-lock.json') }} + restore-keys: | + ${{ runner.os }}-npm-global- + + - name: Cache frontend node_modules + uses: actions/cache@v4 + with: + path: crates/kiko-frontend/node_modules + key: ${{ runner.os }}-frontend-node-modules-tailwindcss + restore-keys: | + ${{ runner.os }}-frontend-node-modules- + + - name: Install Node.js dependencies + run: | + # Install tailwindcss globally and create local node_modules in frontend + npm install -g tailwindcss + cd crates/kiko-frontend + mkdir -p node_modules + npm install tailwindcss --no-save + cd ../../tests/e2e && npm install + + - name: Cache Playwright Browsers + uses: actions/cache@v4 + with: + path: ~/.cache/ms-playwright + key: ${{ runner.os }}-playwright-${{ hashFiles('tests/e2e/package-lock.json') }} + restore-keys: | + ${{ runner.os }}-playwright- + + - name: Install Playwright Browsers + run: cd tests/e2e && npx playwright install --with-deps + + - name: Build Rust project + run: | + cargo build --release + cd crates/kiko-frontend && RUSTFLAGS="-C target-feature=-nontrapping-fptoint" trunk build --release + + - name: Run E2E tests + run: cd tests/e2e && npx playwright test --project="${{ matrix.project }}" + env: + CI: true + + - name: Upload Playwright Report + uses: actions/upload-artifact@v4 + if: failure() + with: + name: playwright-report + path: tests/e2e/playwright-report/ + retention-days: 30 + + - name: Upload test screenshots + uses: actions/upload-artifact@v4 + if: failure() + with: + name: test-screenshots + path: tests/e2e/test-screenshots/ + retention-days: 30 diff --git a/crates/kiko-backend/benches/pubsub.rs b/crates/kiko-backend/benches/pubsub.rs index dbbda27..688b036 100644 --- a/crates/kiko-backend/benches/pubsub.rs +++ b/crates/kiko-backend/benches/pubsub.rs @@ -12,7 +12,7 @@ fn bench_pubsub_subscribe(c: &mut Criterion) { b.to_async(tokio::runtime::Runtime::new().unwrap()) .iter(|| async { let session_id = SessionId::new(); - black_box(pubsub.subscribe(session_id).await) + black_box(pubsub.subscribe(session_id)) }); }); @@ -30,7 +30,7 @@ fn bench_pubsub_subscribe(c: &mut Criterion) { let pubsub = pubsub.clone(); async move { let session_id = SessionId::new(); - pubsub.subscribe(session_id).await + pubsub.subscribe(session_id) } }) .collect(); @@ -55,13 +55,13 @@ fn bench_pubsub_publish(c: &mut Criterion) { // Setup subscription first let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); }); c.bench_function("pubsub_publish_single", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) .iter(|| async { - pubsub.publish(session_id.clone(), message.clone()).await; + pubsub.publish(session_id.clone(), message.clone()); }); }); @@ -75,7 +75,7 @@ fn bench_pubsub_publish(c: &mut Criterion) { b.to_async(tokio::runtime::Runtime::new().unwrap()) .iter(|| async { for _ in 0..message_count { - pubsub.publish(session_id.clone(), message.clone()).await; + pubsub.publish(session_id.clone(), message.clone()); } }); }, @@ -96,13 +96,13 @@ fn bench_pubsub_consume_events(c: &mut Criterion) { // Setup subscription first let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - let _notifier = pubsub.subscribe(session_id.clone()).await; - pubsub.publish(session_id.clone(), message).await; + let _notifier = pubsub.subscribe(session_id.clone()); + pubsub.publish(session_id.clone(), message); }); c.bench_function("pubsub_get_event", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter(|| async { black_box(pubsub.get_event(&session_id).await) }); + .iter(|| async { black_box(pubsub.get_event(&session_id)) }); }); let mut group = c.benchmark_group("pubsub_consume_event_cycle"); @@ -119,8 +119,8 @@ fn bench_pubsub_consume_events(c: &mut Criterion) { name: format!("Benchmark Session {i}"), duration: Duration::from_secs(3600), }); - pubsub.publish(session_id.clone(), msg).await; - black_box(pubsub.consume_event(&session_id).await); + pubsub.publish(session_id.clone(), msg); + black_box(pubsub.consume_event(&session_id)); } }); }, @@ -146,7 +146,7 @@ fn bench_pubsub_concurrent_operations(c: &mut Criterion) { let pubsub = pubsub.clone(); async move { let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { @@ -154,8 +154,8 @@ fn bench_pubsub_concurrent_operations(c: &mut Criterion) { duration: Duration::from_secs(3600), }); - pubsub.publish(session_id.clone(), message).await; - pubsub.consume_event(&session_id).await + pubsub.publish(session_id.clone(), message); + pubsub.consume_event(&session_id) } }) .collect(); @@ -177,16 +177,16 @@ fn bench_pubsub_memory_efficiency(c: &mut Criterion) { let session_ids: Vec = (0..100).map(|_| SessionId::new()).collect(); for session_id in &session_ids { - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Cleanup Test Session".to_string(), duration: Duration::from_secs(3600), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); } for session_id in &session_ids { - pubsub.cleanup_session(session_id).await; + pubsub.cleanup_session(session_id); } }); }); diff --git a/crates/kiko-backend/src/handlers/v1/websocket.rs b/crates/kiko-backend/src/handlers/v1/websocket.rs index 236cc03..5653524 100644 --- a/crates/kiko-backend/src/handlers/v1/websocket.rs +++ b/crates/kiko-backend/src/handlers/v1/websocket.rs @@ -87,7 +87,7 @@ async fn setup_subscription( conn_state.session_id = Some(session_id.clone()); // Subscribe to the session and get the notifier - let notifier = state.pub_sub.subscribe(session_id.clone()).await; + let notifier = state.pub_sub.subscribe(session_id.clone()); // Create a channel for sending messages to the WebSocket let (outbound_tx, rx) = mpsc::unbounded_channel::(); @@ -109,7 +109,7 @@ async fn setup_subscription( // Create the next notification listener BEFORE processing the current message notified = notifier.notified(); - if let Some(msg) = state.pub_sub.get_event(&session_id).await { + if let Some(msg) = state.pub_sub.get_event(&session_id) { match serde_json::to_string(&*msg) { Ok(json) => { log::debug!("Sending message to WebSocket: {}", json); @@ -178,7 +178,7 @@ async fn handle_join_session( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -235,7 +235,7 @@ async fn handle_remove_participant( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -278,7 +278,7 @@ async fn handle_point_session( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -314,7 +314,7 @@ async fn handle_set_topic( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -349,7 +349,7 @@ async fn handle_clear_points( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -383,7 +383,7 @@ async fn handle_toggle_hide_points( // Broadcast the updated session to all subscribers let update_message = SessionMessage::SessionUpdate(session); - state.pub_sub.publish(session_id, update_message).await; + state.pub_sub.publish(session_id, update_message); Ok(WebSocketResponse::None) } @@ -576,10 +576,7 @@ async fn cleanup_connection(conn_state: &mut ConnectionState, state: &Arc`] for efficient cross-task notifications /// /// # Performance Characteristics /// -/// - **Subscribe**: O(1) amortized (HashMap insert) -/// - **Publish**: O(1) for message update + O(log n) for notifier lookup -/// - **Get/Consume**: O(1) lock-free read + O(log n) for HashMap access +/// - **Subscribe**: O(1) amortized (DashMap insert) +/// - **Publish**: O(1) for message update + O(1) for lock-free notifier lookup +/// - **Get/Consume**: O(1) lock-free read + O(1) for lock-free DashMap access /// - **Cleanup**: O(1) for both notifier and event removal pub struct PubSub { /// Per-session notification handles for waking up subscribers. - notifiers: RwLock>>, + notifiers: DashMap>, /// Per-session message storage using lock-free atomic swaps. - events: RwLock>>, + events: DashMap>, } impl PubSub { @@ -92,8 +93,8 @@ impl PubSub { /// ``` pub fn new() -> Self { Self { - notifiers: RwLock::new(HashMap::new()), - events: RwLock::new(HashMap::new()), + notifiers: DashMap::new(), + events: DashMap::new(), } } @@ -123,13 +124,12 @@ impl PubSub { /// # async fn main() { /// let pubsub = Arc::new(PubSub::new()); /// let session_id = SessionId::new(); - /// let notifier = pubsub.subscribe(session_id).await; + /// let notifier = pubsub.subscribe(session_id); /// notifier.notified().await; /// # } /// ``` - pub async fn subscribe(&self, session_id: SessionId) -> Arc { - let mut notifiers = self.notifiers.write().await; - notifiers + pub fn subscribe(&self, session_id: SessionId) -> Arc { + self.notifiers .entry(session_id) .or_insert_with(|| Arc::new(Notify::new())) .clone() @@ -161,7 +161,7 @@ impl PubSub { /// let session_id = SessionId::new(); /// /// // First subscribe to ensure message isn't discarded - /// let _notifier = pubsub.subscribe(session_id.clone()).await; + /// let _notifier = pubsub.subscribe(session_id.clone()); /// /// // Then publish a message /// let create_session = CreateSession { @@ -169,21 +169,19 @@ impl PubSub { /// duration: Duration::from_secs(1800), /// }; /// let message = SessionMessage::CreateSession(create_session); - /// pubsub.publish(session_id, message).await; + /// pubsub.publish(session_id, message); /// # } /// ``` - pub async fn publish(&self, session_id: SessionId, message: SessionMessage) { + pub fn publish(&self, session_id: SessionId, message: SessionMessage) { // Get the notifier first - let notifier = self.notifiers.read().await.get(&session_id).cloned(); + let notifier = self.notifiers.get(&session_id).map(|entry| entry.clone()); if let Some(notifier) = notifier { - // Store the message and immediately drop the write lock - { - let mut events = self.events.write().await; - events.insert(session_id, ArcSwap::from_pointee(message)); - } // Write lock dropped here + // Store the message - no locks needed with DashMap + self.events + .insert(session_id, ArcSwap::from_pointee(message)); - // Now notify waiters - they can immediately acquire read locks + // Now notify waiters notifier.notify_waiters(); } } @@ -212,14 +210,13 @@ impl PubSub { /// # async fn main() { /// let pubsub = PubSub::new(); /// let session_id = SessionId::new(); - /// if let Some(message) = pubsub.get_event(&session_id).await { + /// if let Some(message) = pubsub.get_event(&session_id) { /// // Process message, but it remains in the queue /// } /// # } /// ``` - pub async fn get_event(&self, session_id: &SessionId) -> Option> { - let events = self.events.read().await; - events.get(session_id).map(|arc_swap| arc_swap.load_full()) + pub fn get_event(&self, session_id: &SessionId) -> Option> { + self.events.get(session_id).map(|entry| entry.load_full()) } /// Consumes and removes the current message for a session. @@ -247,16 +244,15 @@ impl PubSub { /// # async fn main() { /// let pubsub = PubSub::new(); /// let session_id = SessionId::new(); - /// if let Some(message) = pubsub.consume_event(&session_id).await { + /// if let Some(message) = pubsub.consume_event(&session_id) { /// // Process message - it's no longer in the queue /// } /// # } /// ``` - pub async fn consume_event(&self, session_id: &SessionId) -> Option> { - let mut events = self.events.write().await; - events + pub fn consume_event(&self, session_id: &SessionId) -> Option> { + self.events .remove(session_id) - .map(|arc_swap| arc_swap.load_full()) + .map(|(_, arc_swap)| arc_swap.load_full()) } /// Completely removes all data associated with a session. @@ -279,14 +275,12 @@ impl PubSub { /// # async fn main() { /// let pubsub = PubSub::new(); /// let session_id = SessionId::new(); - /// pubsub.cleanup_session(&session_id).await; + /// pubsub.cleanup_session(&session_id); /// # } /// ``` - pub async fn cleanup_session(&self, session_id: &SessionId) { - let mut events = self.events.write().await; - let mut notifiers = self.notifiers.write().await; - events.remove(session_id); - notifiers.remove(session_id); + pub fn cleanup_session(&self, session_id: &SessionId) { + self.events.remove(session_id); + self.notifiers.remove(session_id); } /// Returns the number of active sessions with subscribers. @@ -307,14 +301,14 @@ impl PubSub { /// # #[tokio::main] /// # async fn main() { /// let pubsub = PubSub::new(); - /// assert_eq!(pubsub.session_count().await, 0); + /// assert_eq!(pubsub.session_count(), 0); /// let session_id = SessionId::new(); - /// let _notifier = pubsub.subscribe(session_id).await; - /// assert_eq!(pubsub.session_count().await, 1); + /// let _notifier = pubsub.subscribe(session_id); + /// assert_eq!(pubsub.session_count(), 1); /// # } /// ``` - pub async fn session_count(&self) -> usize { - self.notifiers.read().await.len() + pub fn session_count(&self) -> usize { + self.notifiers.len() } /// Checks if a session has a pending message without retrieving it. @@ -341,15 +335,14 @@ impl PubSub { /// # async fn main() { /// let pubsub = PubSub::new(); /// let session_id = SessionId::new(); - /// if pubsub.has_event(&session_id).await { - /// let message = pubsub.consume_event(&session_id).await; + /// if pubsub.has_event(&session_id) { + /// let message = pubsub.consume_event(&session_id); /// // Process the message /// } /// # } /// ``` - pub async fn has_event(&self, session_id: &SessionId) -> bool { - let events = self.events.read().await; - events.contains_key(session_id) + pub fn has_event(&self, session_id: &SessionId) -> bool { + self.events.contains_key(session_id) } } @@ -375,25 +368,25 @@ mod tests { let pubsub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Test Message".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // First consume should return the message - let event = pubsub.consume_event(&session_id).await; + let event = pubsub.consume_event(&session_id); assert!(event.is_some()); // Second consume should return None (message was removed) - let event2 = pubsub.consume_event(&session_id).await; + let event2 = pubsub.consume_event(&session_id); assert!(event2.is_none()); // get_event should also return None now - let event3 = pubsub.get_event(&session_id).await; + let event3 = pubsub.get_event(&session_id); assert!(event3.is_none()); } @@ -402,28 +395,28 @@ mod tests { let pubsub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Persistent Test".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // get_event should not remove the message - let event1 = pubsub.get_event(&session_id).await; + let event1 = pubsub.get_event(&session_id); assert!(event1.is_some()); - let event2 = pubsub.get_event(&session_id).await; + let event2 = pubsub.get_event(&session_id); assert!(event2.is_some()); // consume_event should remove it - let event3 = pubsub.consume_event(&session_id).await; + let event3 = pubsub.consume_event(&session_id); assert!(event3.is_some()); // Now it should be gone - let event4 = pubsub.get_event(&session_id).await; + let event4 = pubsub.get_event(&session_id); assert!(event4.is_none()); } @@ -432,27 +425,27 @@ mod tests { let pubsub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Cleanup Test".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // Verify session exists - assert_eq!(pubsub.session_count().await, 1); - assert!(pubsub.has_event(&session_id).await); + assert_eq!(pubsub.session_count(), 1); + assert!(pubsub.has_event(&session_id)); // Cleanup session - pubsub.cleanup_session(&session_id).await; + pubsub.cleanup_session(&session_id); // Verify session is gone - assert_eq!(pubsub.session_count().await, 0); - assert!(!pubsub.has_event(&session_id).await); + assert_eq!(pubsub.session_count(), 0); + assert!(!pubsub.has_event(&session_id)); - let event = pubsub.get_event(&session_id).await; + let event = pubsub.get_event(&session_id); assert!(event.is_none()); } @@ -466,32 +459,32 @@ mod tests { let session_id = SessionId::new(); session_ids.push(session_id.clone()); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: format!("Session {i}"), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id, message).await; + pubsub.publish(session_id, message); } - assert_eq!(pubsub.session_count().await, 100); + assert_eq!(pubsub.session_count(), 100); // Consume all events for session_id in &session_ids { - let event = pubsub.consume_event(session_id).await; + let event = pubsub.consume_event(session_id); assert!(event.is_some()); } // Events should be removed, but notifiers should still exist - assert_eq!(pubsub.session_count().await, 100); + assert_eq!(pubsub.session_count(), 100); // Clean up all sessions for session_id in &session_ids { - pubsub.cleanup_session(session_id).await; + pubsub.cleanup_session(session_id); } - assert_eq!(pubsub.session_count().await, 0); + assert_eq!(pubsub.session_count(), 0); } #[tokio::test] @@ -499,26 +492,26 @@ mod tests { let pubsub = PubSub::new(); let session_id = SessionId::new(); - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); // No event initially - assert!(!pubsub.has_event(&session_id).await); + assert!(!pubsub.has_event(&session_id)); // Publish message let message = SessionMessage::CreateSession(kiko::data::CreateSession { name: "Event Check".to_string(), duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // Should have event now - assert!(pubsub.has_event(&session_id).await); + assert!(pubsub.has_event(&session_id)); // Consume event - let _event = pubsub.consume_event(&session_id).await; + let _event = pubsub.consume_event(&session_id); // Should not have event anymore - assert!(!pubsub.has_event(&session_id).await); + assert!(!pubsub.has_event(&session_id)); } #[tokio::test] @@ -532,14 +525,14 @@ mod tests { duration: Duration::from_secs(1800), }); - pubsub.publish(session_id.clone(), message).await; + pubsub.publish(session_id.clone(), message); // Then subscribe - let _notifier = pubsub.subscribe(session_id.clone()).await; + let _notifier = pubsub.subscribe(session_id.clone()); // Message should not be available since we subscribed after publishing // This is CORRECT pub/sub behavior - let event = pubsub.get_event(&session_id).await; + let event = pubsub.get_event(&session_id); assert!( event.is_none(), "Should not receive messages published before subscription" @@ -551,7 +544,7 @@ mod tests { let pubsub = Arc::new(PubSub::new()); let session_id = SessionId::new(); - let notifier = pubsub.subscribe(session_id.clone()).await; + let notifier = pubsub.subscribe(session_id.clone()); let pubsub_clone = pubsub.clone(); let session_id_clone = session_id.clone(); @@ -563,9 +556,7 @@ mod tests { session_id: session_id_clone.to_string(), participant_id: format!("participant_{i}"), }); - pubsub_clone - .publish(session_id_clone.clone(), message) - .await; + pubsub_clone.publish(session_id_clone.clone(), message); sleep(Duration::from_millis(50)).await; } }); @@ -580,7 +571,7 @@ mod tests { timeout(Duration::from_millis(200), notifier.notified()).await; if notification_result.is_ok() { // Process the event - if let Some(event) = pubsub.consume_event(&session_id).await { + if let Some(event) = pubsub.consume_event(&session_id) { match event.as_ref() { SessionMessage::RemoveParticipant(remove_participant) => { println!( @@ -601,7 +592,7 @@ mod tests { assert_eq!(processed_count, 3); // All events should be consumed - assert!(!pubsub.has_event(&session_id).await); + assert!(!pubsub.has_event(&session_id)); } // Keep your original working tests too @@ -609,17 +600,17 @@ mod tests { async fn multiple_subscribers() { let pubsub = PubSub::new(); let session_id = SessionId::new(); - let _notifier1 = pubsub.subscribe(session_id.clone()).await; - let _notifier2 = pubsub.subscribe(session_id.clone()).await; + let _notifier1 = pubsub.subscribe(session_id.clone()); + let _notifier2 = pubsub.subscribe(session_id.clone()); let message = SessionMessage::AddParticipant(kiko::data::AddParticipant { session_id: session_id.clone().to_string(), participant_name: "participant1".to_string(), }); - pubsub.publish(session_id.clone(), message.clone()).await; - let event1 = pubsub.get_event(&session_id).await; - let event2 = pubsub.get_event(&session_id).await; + pubsub.publish(session_id.clone(), message.clone()); + let event1 = pubsub.get_event(&session_id); + let event2 = pubsub.get_event(&session_id); assert!(event1.is_some()); assert!(event2.is_some()); From 3c4d2b68924cb0ed045ae9a65d9ef051c2d2d3c7 Mon Sep 17 00:00:00 2001 From: Zack Kollar Date: Thu, 25 Sep 2025 03:33:03 -0400 Subject: [PATCH 2/3] :hammer: Keep e2e CI/CD config, disable via GH --- .github/workflows/e2e-tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 52e2eb5..674f53b 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -2,9 +2,9 @@ name: E2E Tests on: push: - branches: [] # [ main, develop ] # disable for now, this is broken + branches: [main, develop] pull_request: - branches: [] # [ main, develop ] # disable for now, this is broken + branches: [main, develop] jobs: e2e-tests: From 15a442816e5cf02d059b0e410e2a2ee2b8d3ee3a Mon Sep 17 00:00:00 2001 From: Zack Kollar Date: Thu, 25 Sep 2025 04:01:43 -0400 Subject: [PATCH 3/3] :hammer: Make PubSub generic, simplify trait bounds with aliased traits --- crates/kiko-backend/benches/pubsub.rs | 10 +-- crates/kiko-backend/src/lib.rs | 3 +- crates/kiko-backend/src/messaging.rs | 99 ++++++++++++++++----------- 3 files changed, 66 insertions(+), 46 deletions(-) diff --git a/crates/kiko-backend/benches/pubsub.rs b/crates/kiko-backend/benches/pubsub.rs index 688b036..338d8dd 100644 --- a/crates/kiko-backend/benches/pubsub.rs +++ b/crates/kiko-backend/benches/pubsub.rs @@ -6,7 +6,7 @@ use kiko::{data::SessionMessage, id::SessionId}; use kiko_backend::messaging::PubSub; fn bench_pubsub_subscribe(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); c.bench_function("pubsub_subscribe_single", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) @@ -44,7 +44,7 @@ fn bench_pubsub_subscribe(c: &mut Criterion) { } fn bench_pubsub_publish(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); let session_id = SessionId::new(); let message = SessionMessage::CreateSession(kiko::data::CreateSession { @@ -85,7 +85,7 @@ fn bench_pubsub_publish(c: &mut Criterion) { } fn bench_pubsub_consume_events(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); let session_id = SessionId::new(); let message = SessionMessage::CreateSession(kiko::data::CreateSession { @@ -130,7 +130,7 @@ fn bench_pubsub_consume_events(c: &mut Criterion) { } fn bench_pubsub_concurrent_operations(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); let mut group = c.benchmark_group("pubsub_concurrent_pub_sub"); for session_count in [10, 50, 100].iter() { @@ -169,7 +169,7 @@ fn bench_pubsub_concurrent_operations(c: &mut Criterion) { } fn bench_pubsub_memory_efficiency(c: &mut Criterion) { - let pubsub = Arc::new(PubSub::new()); + let pubsub = Arc::new(PubSub::::new()); c.bench_function("pubsub_session_cleanup", |b| { b.to_async(tokio::runtime::Runtime::new().unwrap()) diff --git a/crates/kiko-backend/src/lib.rs b/crates/kiko-backend/src/lib.rs index ef1b238..86ba763 100644 --- a/crates/kiko-backend/src/lib.rs +++ b/crates/kiko-backend/src/lib.rs @@ -4,10 +4,11 @@ pub mod services; use crate::{messaging::PubSub, services::SessionServiceInMemory}; use chrono::DateTime; +use kiko::{data::SessionMessage, id::SessionId}; /// Shared application state containing services and configuration. pub struct AppState { pub started_at: DateTime, pub sessions: SessionServiceInMemory, - pub pub_sub: PubSub, + pub pub_sub: PubSub, } diff --git a/crates/kiko-backend/src/messaging.rs b/crates/kiko-backend/src/messaging.rs index 7846cb3..eb871db 100644 --- a/crates/kiko-backend/src/messaging.rs +++ b/crates/kiko-backend/src/messaging.rs @@ -41,13 +41,20 @@ //! # } //! ``` -use std::sync::Arc; +use std::{hash::Hash, sync::Arc}; use arc_swap::ArcSwap; use dashmap::DashMap; -use kiko::{data::SessionMessage, id::SessionId}; use tokio::sync::Notify; +/// Trait bounds for session identifier types. +pub trait IdType: Clone + Eq + Hash + Send + Sync {} +impl IdType for T where T: Clone + Eq + Hash + Send + Sync {} + +/// Trait bounds for message types used in the PubSub system. +pub trait MessageType: Clone + Send + Sync {} +impl MessageType for T where T: Clone + Send + Sync {} + /// Event identifier type for tracking message sequence. /// /// Currently unused but reserved for future message ordering and deduplication features. @@ -74,22 +81,31 @@ pub type EventId = u64; /// - **Publish**: O(1) for message update + O(1) for lock-free notifier lookup /// - **Get/Consume**: O(1) lock-free read + O(1) for lock-free DashMap access /// - **Cleanup**: O(1) for both notifier and event removal -pub struct PubSub { +pub struct PubSub +where + Id: IdType, + T: MessageType, +{ /// Per-session notification handles for waking up subscribers. - notifiers: DashMap>, + notifiers: DashMap>, /// Per-session message storage using lock-free atomic swaps. - events: DashMap>, + events: DashMap>, } -impl PubSub { +impl PubSub +where + Id: IdType, + T: MessageType, +{ /// Creates a new empty PubSub instance. /// /// # Examples /// /// ``` /// use kiko_backend::messaging::PubSub; + /// use kiko::{data::SessionMessage, id::SessionId}; /// - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// ``` pub fn new() -> Self { Self { @@ -128,9 +144,9 @@ impl PubSub { /// notifier.notified().await; /// # } /// ``` - pub fn subscribe(&self, session_id: SessionId) -> Arc { + pub fn subscribe(&self, id: Id) -> Arc { self.notifiers - .entry(session_id) + .entry(id) .or_insert_with(|| Arc::new(Notify::new())) .clone() } @@ -172,14 +188,13 @@ impl PubSub { /// pubsub.publish(session_id, message); /// # } /// ``` - pub fn publish(&self, session_id: SessionId, message: SessionMessage) { + pub fn publish(&self, id: Id, message: T) { // Get the notifier first - let notifier = self.notifiers.get(&session_id).map(|entry| entry.clone()); + let notifier = self.notifiers.get(&id).map(|entry| entry.clone()); if let Some(notifier) = notifier { // Store the message - no locks needed with DashMap - self.events - .insert(session_id, ArcSwap::from_pointee(message)); + self.events.insert(id, ArcSwap::from_pointee(message)); // Now notify waiters notifier.notify_waiters(); @@ -204,19 +219,19 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); /// if let Some(message) = pubsub.get_event(&session_id) { /// // Process message, but it remains in the queue /// } /// # } /// ``` - pub fn get_event(&self, session_id: &SessionId) -> Option> { - self.events.get(session_id).map(|entry| entry.load_full()) + pub fn get_event(&self, id: &Id) -> Option> { + self.events.get(id).map(|entry| entry.load_full()) } /// Consumes and removes the current message for a session. @@ -238,20 +253,20 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); /// if let Some(message) = pubsub.consume_event(&session_id) { /// // Process message - it's no longer in the queue /// } /// # } /// ``` - pub fn consume_event(&self, session_id: &SessionId) -> Option> { + pub fn consume_event(&self, id: &Id) -> Option> { self.events - .remove(session_id) + .remove(id) .map(|(_, arc_swap)| arc_swap.load_full()) } @@ -269,18 +284,18 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); /// pubsub.cleanup_session(&session_id); /// # } /// ``` - pub fn cleanup_session(&self, session_id: &SessionId) { - self.events.remove(session_id); - self.notifiers.remove(session_id); + pub fn cleanup_session(&self, id: &Id) { + self.events.remove(id); + self.notifiers.remove(id); } /// Returns the number of active sessions with subscribers. @@ -296,11 +311,11 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// assert_eq!(pubsub.session_count(), 0); /// let session_id = SessionId::new(); /// let _notifier = pubsub.subscribe(session_id); @@ -329,11 +344,11 @@ impl PubSub { /// /// ``` /// use kiko_backend::messaging::PubSub; - /// use kiko::id::SessionId; + /// use kiko::{data::SessionMessage, id::SessionId}; /// /// # #[tokio::main] /// # async fn main() { - /// let pubsub = PubSub::new(); + /// let pubsub: PubSub = PubSub::new(); /// let session_id = SessionId::new(); /// if pubsub.has_event(&session_id) { /// let message = pubsub.consume_event(&session_id); @@ -341,12 +356,16 @@ impl PubSub { /// } /// # } /// ``` - pub fn has_event(&self, session_id: &SessionId) -> bool { - self.events.contains_key(session_id) + pub fn has_event(&self, id: &Id) -> bool { + self.events.contains_key(id) } } -impl Default for PubSub { +impl Default for PubSub +where + Id: IdType, + T: MessageType, +{ /// Creates a default PubSub instance. /// /// Equivalent to calling [`PubSub::new()`]. @@ -365,7 +384,7 @@ mod tests { #[tokio::test] async fn consume_event_removes_message() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); let _notifier = pubsub.subscribe(session_id.clone()); @@ -392,7 +411,7 @@ mod tests { #[tokio::test] async fn get_event_vs_consume_event() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); let _notifier = pubsub.subscribe(session_id.clone()); @@ -422,7 +441,7 @@ mod tests { #[tokio::test] async fn cleanup_session() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); let _notifier = pubsub.subscribe(session_id.clone()); @@ -451,7 +470,7 @@ mod tests { #[tokio::test] async fn memory_leak_prevention() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let mut session_ids = Vec::new(); // Create many sessions @@ -489,7 +508,7 @@ mod tests { #[tokio::test] async fn has_event_utility() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); let _notifier = pubsub.subscribe(session_id.clone()); @@ -516,7 +535,7 @@ mod tests { #[tokio::test] async fn correct_pubsub_behavior_subscribe_after_publish() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); // Publish first (no subscribers yet) - this is correct behavior @@ -598,7 +617,7 @@ mod tests { // Keep your original working tests too #[tokio::test] async fn multiple_subscribers() { - let pubsub = PubSub::new(); + let pubsub: PubSub = PubSub::new(); let session_id = SessionId::new(); let _notifier1 = pubsub.subscribe(session_id.clone()); let _notifier2 = pubsub.subscribe(session_id.clone());