diff --git a/.github/workflows/test-sdk-js.yml b/.github/workflows/test-sdk-js.yml index 775f50ef5..15c2f3cec 100644 --- a/.github/workflows/test-sdk-js.yml +++ b/.github/workflows/test-sdk-js.yml @@ -85,19 +85,6 @@ jobs: cd sdk-js git checkout master || git checkout main - - name: Patch SDK JS workflows for stability - run: | - python3 - <<'PY' - from pathlib import Path - - path = Path("sdk-js/examples/blobs/workflows/blobs-js.yml") - if path.exists(): - content = path.read_text() - content = content.replace("seconds: 3", "seconds: 15") - content = content.replace("seconds: 6", "seconds: 15") - path.write_text(content) - PY - - name: Install SDK JS dependencies working-directory: sdk-js run: | diff --git a/crates/auth/src/auth/token/jwt.rs b/crates/auth/src/auth/token/jwt.rs index ff11972d2..f4c0644ca 100644 --- a/crates/auth/src/auth/token/jwt.rs +++ b/crates/auth/src/auth/token/jwt.rs @@ -320,7 +320,30 @@ impl TokenManager { } } + async fn touch_key_last_activity(&self, key_id: &str) -> Result<(), AuthError> { + // Re-fetch key to avoid overwriting revocations with stale data. + let Some(mut key) = self + .key_manager + .get_key(key_id) + .await + .map_err(|e| AuthError::StorageError(e.to_string()))? + else { + return Ok(()); + }; + + key.metadata.touch(); + self.key_manager + .set_key(key_id, &key) + .await + .map_err(|e| AuthError::StorageError(e.to_string()))?; + + Ok(()) + } + /// Verify a JWT token from request headers + /// + /// This method validates the token, checks for idle timeout, and updates the + /// last activity timestamp to implement sliding window session management. pub async fn verify_token_from_headers( &self, headers: &HeaderMap, @@ -365,6 +388,28 @@ impl TokenManager { return Err(AuthError::InvalidToken("Key has been revoked".to_string())); } + // Check for idle timeout - if the session has been inactive for too long, reject it + if key.metadata.is_idle(self.config.idle_timeout) { + tracing::debug!( + "Session for key {} has exceeded idle timeout of {} seconds", + claims.sub, + self.config.idle_timeout + ); + return Err(AuthError::InvalidToken( + "Session has expired due to inactivity".to_string(), + )); + } + + // Update last activity timestamp (sliding window expiration) + if let Err(e) = self.touch_key_last_activity(&claims.sub).await { + // Log the error but don't fail the request - activity tracking is best-effort + tracing::warn!( + "Failed to update last activity for key {}: {}", + claims.sub, + e + ); + } + Ok(AuthResponse { is_valid: true, key_id: claims.sub, @@ -423,7 +468,7 @@ impl TokenManager { let claims = self.verify_token(refresh_token).await?; // Get the key and verify it's valid - let key = self + let mut key = self .key_manager .get_key(&claims.sub) .await @@ -440,6 +485,31 @@ impl TokenManager { return Err(AuthError::InvalidToken("Key is not valid".to_string())); } + // Check for idle timeout - if the session has been inactive for too long, reject it + if key.metadata.is_idle(self.config.idle_timeout) { + tracing::debug!( + "Session for key {} has exceeded idle timeout of {} seconds", + claims.sub, + self.config.idle_timeout + ); + return Err(AuthError::InvalidToken( + "Session has expired due to inactivity".to_string(), + )); + } + + // Update last activity timestamp (sliding window expiration) + if let Err(e) = self.touch_key_last_activity(&claims.sub).await { + // Log the error but don't fail the refresh - activity tracking is best-effort + tracing::warn!( + "Failed to update last activity for key {}: {}", + claims.sub, + e + ); + } else { + // Keep the in-memory key in sync for client rotation. + key.metadata.touch(); + } + match key.key_type { // For root tokens, simply generate new tokens with the same ID KeyType::Root => { diff --git a/crates/auth/src/config.rs b/crates/auth/src/config.rs index 3bfaacade..7b899a154 100644 --- a/crates/auth/src/config.rs +++ b/crates/auth/src/config.rs @@ -59,6 +59,11 @@ pub struct JwtConfig { /// Refresh token expiry time in seconds (default: 30 days) #[serde(default = "default_refresh_token_expiry")] pub refresh_token_expiry: u64, + + /// Idle timeout in seconds - sessions are revoked after this period of inactivity + /// (default: 30 minutes, set to 0 to disable idle timeout) + #[serde(default = "default_idle_timeout")] + pub idle_timeout: u64, } fn default_access_token_expiry() -> u64 { @@ -69,6 +74,10 @@ fn default_refresh_token_expiry() -> u64 { 30 * 24 * 3600 // 30 days } +fn default_idle_timeout() -> u64 { + 30 * 60 // 30 minutes +} + /// Storage configuration #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] diff --git a/crates/auth/src/embedded.rs b/crates/auth/src/embedded.rs index 2f283975b..3795b2e26 100644 --- a/crates/auth/src/embedded.rs +++ b/crates/auth/src/embedded.rs @@ -101,6 +101,7 @@ pub fn default_config() -> AuthConfig { issuer: "calimero-auth".to_string(), access_token_expiry: 3600, refresh_token_expiry: 2592000, + idle_timeout: 1800, // 30 minutes }, storage: StorageConfig::RocksDB { path: "auth".into(), diff --git a/crates/auth/src/storage/models/keys.rs b/crates/auth/src/storage/models/keys.rs index 76c6f7d51..c206355bc 100644 --- a/crates/auth/src/storage/models/keys.rs +++ b/crates/auth/src/storage/models/keys.rs @@ -234,6 +234,10 @@ pub struct KeyMetadata { pub created_at: u64, /// When the key was revoked pub revoked_at: Option, + /// When the key was last used (for idle timeout tracking) + /// Defaults to created_at if not set (for backward compatibility with existing keys) + #[serde(default)] + pub last_activity: Option, } impl Default for KeyMetadata { @@ -245,9 +249,11 @@ impl Default for KeyMetadata { impl KeyMetadata { /// Create new key metadata pub fn new() -> Self { + let now = Utc::now().timestamp() as u64; Self { - created_at: Utc::now().timestamp() as u64, + created_at: now, revoked_at: None, + last_activity: Some(now), } } @@ -255,4 +261,131 @@ impl KeyMetadata { pub fn revoke(&mut self) { self.revoked_at = Some(Utc::now().timestamp() as u64); } + + /// Update the last activity timestamp + pub fn touch(&mut self) { + self.last_activity = Some(Utc::now().timestamp() as u64); + } + + /// Get the last activity timestamp, falling back to "now" for legacy keys + pub fn get_last_activity(&self) -> u64 { + self.last_activity + .unwrap_or_else(|| Utc::now().timestamp() as u64) + } + + /// Check if the key has been idle for longer than the specified timeout + /// + /// # Arguments + /// + /// * `idle_timeout_secs` - The idle timeout in seconds (0 means disabled) + /// + /// # Returns + /// + /// * `bool` - true if the key is idle (exceeded timeout), false otherwise + pub fn is_idle(&self, idle_timeout_secs: u64) -> bool { + if idle_timeout_secs == 0 { + return false; // Idle timeout disabled + } + let now = Utc::now().timestamp() as u64; + let last_activity = self.last_activity.unwrap_or(now); + now.saturating_sub(last_activity) > idle_timeout_secs + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_metadata_new() { + let metadata = KeyMetadata::new(); + assert!(metadata.revoked_at.is_none()); + assert!(metadata.last_activity.is_some()); + // last_activity should be approximately equal to created_at + assert_eq!(metadata.last_activity.unwrap(), metadata.created_at); + } + + #[test] + fn test_key_metadata_touch() { + let mut metadata = KeyMetadata::new(); + let original_activity = metadata.get_last_activity(); + // Touch should update the last_activity + metadata.touch(); + // Since we can't easily test time changes, just verify it's set + assert!(metadata.last_activity.is_some()); + assert!(metadata.get_last_activity() >= original_activity); + } + + #[test] + fn test_key_metadata_get_last_activity_with_value() { + let mut metadata = KeyMetadata::new(); + metadata.last_activity = Some(12345); + assert_eq!(metadata.get_last_activity(), 12345); + } + + #[test] + fn test_key_metadata_get_last_activity_fallback() { + let metadata = KeyMetadata { + created_at: 1, + revoked_at: None, + last_activity: None, + }; + let before = Utc::now().timestamp() as u64; + let last_activity = metadata.get_last_activity(); + let after = Utc::now().timestamp() as u64; + // Should fall back to a current timestamp for legacy keys + assert!(last_activity >= before); + assert!(last_activity <= after); + } + + #[test] + fn test_key_metadata_is_idle_disabled() { + let mut metadata = KeyMetadata::new(); + // Set last_activity to a very old timestamp + metadata.last_activity = Some(1); + // With idle_timeout of 0, should never be idle + assert!(!metadata.is_idle(0)); + } + + #[test] + fn test_key_metadata_is_idle_not_expired() { + let metadata = KeyMetadata::new(); + // Just created, with a 30 minute timeout, should not be idle + assert!(!metadata.is_idle(30 * 60)); + } + + #[test] + fn test_key_metadata_is_idle_expired() { + let mut metadata = KeyMetadata::new(); + // Set last_activity to 2 hours ago + let now = Utc::now().timestamp() as u64; + metadata.last_activity = Some(now.saturating_sub(2 * 60 * 60)); + // With a 30 minute timeout, should be idle + assert!(metadata.is_idle(30 * 60)); + } + + #[test] + fn test_key_metadata_backward_compatibility() { + // Simulate a key from before idle timeout was added (no last_activity) + let metadata = KeyMetadata { + created_at: 1000, + revoked_at: None, + last_activity: None, + }; + // Legacy keys should not be treated as idle immediately + assert!(!metadata.is_idle(30 * 60)); + } + + #[test] + fn test_key_is_valid_and_not_idle() { + let key = Key::new_root_key_with_permissions( + "test_pub_key".to_string(), + "near".to_string(), + vec!["admin".to_string()], + None, + ); + assert!(key.is_valid()); + // Newly created key should not be idle + assert!(!key.metadata.is_idle(30 * 60)); + } } diff --git a/crates/node/src/sync/manager.rs b/crates/node/src/sync/manager.rs index 275877707..6f00edfb2 100644 --- a/crates/node/src/sync/manager.rs +++ b/crates/node/src/sync/manager.rs @@ -16,6 +16,7 @@ use calimero_primitives::common::DIGEST_SIZE; use calimero_primitives::context::ContextId; use calimero_primitives::identity::PublicKey; use eyre::bail; +use eyre::WrapErr; use futures_util::stream::{self, FuturesUnordered}; use futures_util::{FutureExt, StreamExt}; use libp2p::gossipsub::TopicHash; @@ -483,7 +484,18 @@ impl SyncManager { info!(%context_id, %peer_id, "Attempting to sync with peer"); - let protocol = self.initiate_sync_inner(context_id, peer_id).await?; + let protocol = match self.initiate_sync_inner(context_id, peer_id).await { + Ok(protocol) => protocol, + Err(err) => { + warn!( + %context_id, + %peer_id, + error = %err, + "Sync attempt failed for peer" + ); + return Err(err); + } + }; let took = start.elapsed(); @@ -737,7 +749,11 @@ impl SyncManager { // Note: request_snapshot_sync opens its own stream, existing stream // will be closed when this function returns - match self.request_snapshot_sync(context_id, chosen_peer).await { + match self + .request_snapshot_sync(context_id, chosen_peer) + .await + .wrap_err("snapshot sync") + { Ok(result) => { info!( %context_id, @@ -747,6 +763,36 @@ impl SyncManager { dag_heads_count = result.dag_heads.len(), "Snapshot sync completed successfully" ); + if !result.dag_heads.is_empty() { + match self.network_client.open_stream(chosen_peer).await { + Ok(mut fine_stream) => { + if let Err(e) = self + .fine_sync_from_boundary( + context_id, + chosen_peer, + our_identity, + &mut fine_stream, + ) + .await + { + warn!( + %context_id, + %chosen_peer, + error = %e, + "Fine-sync after snapshot failed, state may be slightly behind" + ); + } + } + Err(e) => { + warn!( + %context_id, + %chosen_peer, + error = %e, + "Fine-sync stream open failed, state may be slightly behind" + ); + } + } + } return Ok(Some(SyncProtocol::SnapshotSync)); } Err(e) => { @@ -797,7 +843,8 @@ impl SyncManager { // Request DAG heads just like uninitialized nodes let result = self .request_dag_heads_and_sync(context_id, chosen_peer, our_identity, stream) - .await?; + .await + .wrap_err("request DAG heads and sync")?; // If peer had no data, return error to try next peer if matches!(result, SyncProtocol::None) { @@ -844,7 +891,8 @@ impl SyncManager { let result = self .request_dag_heads_and_sync(context_id, chosen_peer, our_identity, stream) - .await?; + .await + .wrap_err("request DAG heads and sync")?; // If peer had no data or unexpected response, return error to try next peer if matches!(result, SyncProtocol::None) { @@ -862,7 +910,8 @@ impl SyncManager { let result = self .request_dag_heads_and_sync(context_id, chosen_peer, our_identity, stream) - .await?; + .await + .wrap_err("request DAG heads and sync")?; // If peer had no data or unexpected response, return error to try next peer if matches!(result, SyncProtocol::None) { @@ -958,10 +1007,15 @@ impl SyncManager { bail!("no owned identities found for context: {}", context.id); }; - let mut stream = self.network_client.open_stream(chosen_peer).await?; + let mut stream = self + .network_client + .open_stream(chosen_peer) + .await + .wrap_err("open stream for sync")?; self.initiate_key_share_process(&mut context, our_identity, &mut stream) - .await?; + .await + .wrap_err("key share")?; if !self.node_client.has_blob(&blob_id)? { // Get size from application config if we don't have application yet @@ -970,7 +1024,8 @@ impl SyncManager { .await?; self.initiate_blob_share_process(&context, our_identity, blob_id, size, &mut stream) - .await?; + .await + .wrap_err("blob share")?; // After blob sharing, try to install application if it doesn't exist if application.is_none() { @@ -981,7 +1036,8 @@ impl SyncManager { &mut context, &mut application, ) - .await?; + .await + .wrap_err("install bundle after blob share")?; } } @@ -992,7 +1048,8 @@ impl SyncManager { // Handle DAG synchronization if needed (uninitialized or incomplete DAG) if let Some(result) = self .handle_dag_sync(context_id, &context, chosen_peer, our_identity, &mut stream) - .await? + .await + .wrap_err("DAG sync")? { return Ok(result); }