From 0096fa80b387a265a851557fd67cc003a6ecee33 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 09:03:44 +0100 Subject: [PATCH 01/13] Add custom authenticator --- crates/catalog/rest/src/catalog.rs | 35 +++++++++++++++++-- crates/catalog/rest/src/client.rs | 54 +++++++++++++++++++++++++----- crates/catalog/rest/src/lib.rs | 1 + 3 files changed, 79 insertions(+), 11 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 39553f7554..823f736681 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -21,6 +21,7 @@ use std::any::Any; use std::collections::HashMap; use std::future::Future; use std::str::FromStr; +use std::sync::Arc; use async_trait::async_trait; use iceberg::io::{self, FileIO}; @@ -34,11 +35,12 @@ use reqwest::header::{ HeaderMap, HeaderName, HeaderValue, {self}, }; use reqwest::{Client, Method, StatusCode, Url}; -use tokio::sync::OnceCell; +use tokio::sync::{Mutex, OnceCell}; use typed_builder::TypedBuilder; use crate::client::{ - HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, + HttpClient, TokenAuthenticator, deserialize_catalog_response, + deserialize_unexpected_catalog_error, }; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, @@ -327,6 +329,8 @@ pub struct RestCatalog { ctx: OnceCell, /// Extensions for the FileIOBuilder. file_io_extensions: io::Extensions, + /// Custom token authenticator (must be set before first use) + authenticator: Mutex>>, } impl RestCatalog { @@ -336,6 +340,7 @@ impl RestCatalog { user_config: config, ctx: OnceCell::new(), file_io_extensions: io::Extensions::default(), + authenticator: Mutex::new(None), } } @@ -345,11 +350,35 @@ impl RestCatalog { self } + /// Set a custom token authenticator. + /// + /// The authenticator will be used to obtain tokens instead of using static tokens + /// or OAuth credentials. This must be called before any catalog operations. + /// + /// # Example + /// ```ignore + /// let authenticator = Arc::new(MyTokenAuthenticator::new()); + /// let catalog = RestCatalogBuilder::default() + /// .load("rest", config) + /// .await? + /// .with_token_authenticator(authenticator); + /// ``` + pub async fn with_token_authenticator(self, authenticator: Arc) -> Self { + *self.authenticator.lock().await = Some(authenticator); + self + } + /// Gets the [`RestContext`] from the catalog. async fn context(&self) -> Result<&RestContext> { self.ctx .get_or_try_init(|| async { - let client = HttpClient::new(&self.user_config)?; + let mut client = HttpClient::new(&self.user_config)?; + + // Set authenticator if one was configured + if let Some(authenticator) = self.authenticator.lock().await.clone() { + client = client.with_authenticator(authenticator); + } + let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?; let config = self.user_config.clone().merge_with_config(catalog_config); let client = client.update_with(&config)?; diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 361c036bb6..0988786ac9 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use std::sync::Arc; use http::StatusCode; use iceberg::{Error, ErrorKind, Result}; @@ -28,6 +29,17 @@ use tokio::sync::Mutex; use crate::RestCatalogConfig; use crate::types::{ErrorResponse, TokenResponse}; +/// Trait for custom token authentication. +/// +/// Implement this trait to provide custom token generation/refresh logic +/// instead of using OAuth credentials. +#[async_trait::async_trait] +pub trait TokenAuthenticator: Send + Sync + Debug { + /// Get or refresh the authentication token. + /// Called when the client needs a token for authentication. + async fn get_token(&self) -> Result; +} + pub(crate) struct HttpClient { client: Client, @@ -39,6 +51,8 @@ pub(crate) struct HttpClient { token_endpoint: String, /// The credential to be used for authentication. credential: Option<(Option, String)>, + /// Custom token authenticator (takes precedence over credential/token) + authenticator: Option>, /// Extra headers to be added to each request. extra_headers: HeaderMap, /// Extra oauth parameters to be added to each authentication request. @@ -63,6 +77,7 @@ impl HttpClient { token: Mutex::new(cfg.token()), token_endpoint: cfg.get_token_endpoint(), credential: cfg.credential(), + authenticator: None, extra_headers, extra_oauth_params: cfg.extra_oauth_params(), }) @@ -86,6 +101,7 @@ impl HttpClient { self.token_endpoint }, credential: cfg.credential().or(self.credential), + authenticator: self.authenticator, extra_headers, extra_oauth_params: if !cfg.extra_oauth_params().is_empty() { cfg.extra_oauth_params() @@ -174,6 +190,16 @@ impl HttpClient { Ok(auth_res.access_token) } + /// Set a custom token authenticator. + /// + /// When set, the authenticator will be called to get tokens instead of using + /// static tokens or OAuth credentials. This allows for custom token management + /// such as reading from files, APIs, or other custom sources. + pub fn with_authenticator(mut self, authenticator: Arc) -> Self { + self.authenticator = Some(authenticator); + self + } + /// Invalidate the current token without generating a new one. On the next request, the client /// will attempt to generate a new token. pub(crate) async fn invalidate_token(&self) -> Result<()> { @@ -195,18 +221,30 @@ impl HttpClient { /// Authenticates the request by adding a bearer token to the authorization header. /// - /// This method supports three authentication modes: - /// - /// 1. **No authentication** - Skip authentication when both `credential` and `token` are missing. - /// 2. **Token authentication** - Use the provided `token` directly for authentication. - /// 3. **OAuth authentication** - Exchange `credential` for a token, cache it, then use it for authentication. + /// This method supports four authentication modes (in order of precedence): /// - /// When both `credential` and `token` are present, `token` takes precedence. + /// 1. **Custom authenticator** - If set, use the custom TokenAuthenticator to get tokens. + /// 2. **Token authentication** - Use the provided static `token` directly. + /// 3. **OAuth authentication** - Exchange `credential` for a token, cache it, then use it. + /// 4. **No authentication** - Skip authentication when none of the above are available. /// - /// # TODO: Support automatic token refreshing. + /// When an authenticator is provided, it takes precedence over static tokens and credentials. async fn authenticate(&self, req: &mut Request) -> Result<()> { + // Try authenticator first (highest priority) + if let Some(authenticator) = &self.authenticator { + let token = authenticator.get_token().await?; + req.headers_mut().insert( + http::header::AUTHORIZATION, + format!("Bearer {token}").parse().map_err(|e| { + Error::new(ErrorKind::DataInvalid, "Invalid token from authenticator!") + .with_source(e) + })?, + ); + return Ok(()); + } + // Clone the token from lock without holding the lock for entire function. - let token = self.token.lock().await.clone(); + let token: Option = self.token.lock().await.clone(); if self.credential.is_none() && token.is_none() { return Ok(()); diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 70cdeaabd0..1d66b92f9d 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -56,3 +56,4 @@ mod client; mod types; pub use catalog::*; +pub use client::TokenAuthenticator; From d960d6558b0cc2b11b5c3a147143ed68cef2db7f Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 09:14:03 +0100 Subject: [PATCH 02/13] Format --- crates/catalog/rest/src/catalog.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 823f736681..42dd6fd50e 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -363,7 +363,10 @@ impl RestCatalog { /// .await? /// .with_token_authenticator(authenticator); /// ``` - pub async fn with_token_authenticator(self, authenticator: Arc) -> Self { + pub async fn with_token_authenticator( + self, + authenticator: Arc, + ) -> Self { *self.authenticator.lock().await = Some(authenticator); self } From 3c253553a8bddf3e365075eb300464712f51da8e Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 09:34:28 +0100 Subject: [PATCH 03/13] . --- crates/catalog/rest/src/catalog.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 42dd6fd50e..68409cda1e 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -35,7 +35,7 @@ use reqwest::header::{ HeaderMap, HeaderName, HeaderValue, {self}, }; use reqwest::{Client, Method, StatusCode, Url}; -use tokio::sync::{Mutex, OnceCell}; +use tokio::sync::OnceCell; use typed_builder::TypedBuilder; use crate::client::{ @@ -330,7 +330,7 @@ pub struct RestCatalog { /// Extensions for the FileIOBuilder. file_io_extensions: io::Extensions, /// Custom token authenticator (must be set before first use) - authenticator: Mutex>>, + authenticator: OnceCell>, } impl RestCatalog { @@ -340,7 +340,7 @@ impl RestCatalog { user_config: config, ctx: OnceCell::new(), file_io_extensions: io::Extensions::default(), - authenticator: Mutex::new(None), + authenticator: OnceCell::new(), } } @@ -363,11 +363,11 @@ impl RestCatalog { /// .await? /// .with_token_authenticator(authenticator); /// ``` - pub async fn with_token_authenticator( + pub fn with_token_authenticator( self, authenticator: Arc, ) -> Self { - *self.authenticator.lock().await = Some(authenticator); + let _ = self.authenticator.set(authenticator); self } @@ -378,8 +378,8 @@ impl RestCatalog { let mut client = HttpClient::new(&self.user_config)?; // Set authenticator if one was configured - if let Some(authenticator) = self.authenticator.lock().await.clone() { - client = client.with_authenticator(authenticator); + if let Some(authenticator) = self.authenticator.get() { + client = client.with_authenticator(authenticator.clone()); } let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?; From 1b4dc674d11b8838e9133289c75b65571a5dae4f Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 10:17:22 +0100 Subject: [PATCH 04/13] Format --- crates/catalog/rest/src/catalog.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 68409cda1e..1739e2d981 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -363,10 +363,7 @@ impl RestCatalog { /// .await? /// .with_token_authenticator(authenticator); /// ``` - pub fn with_token_authenticator( - self, - authenticator: Arc, - ) -> Self { + pub fn with_token_authenticator(self, authenticator: Arc) -> Self { let _ = self.authenticator.set(authenticator); self } From 5a426b501d9ccca713dc0700c4d36a7a841164f0 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 10:58:32 +0100 Subject: [PATCH 05/13] Add test --- .../catalog/rest/tests/rest_catalog_test.rs | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 59fea0b51f..790c43877b 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -449,3 +449,186 @@ async fn test_register_table() { table_registered.identifier().to_string() ); } + +#[tokio::test] +async fn test_catalog_with_custom_token_authenticator() { + let catalog = get_catalog().await; + + // Create a mock authenticator that returns a fixed token + use std::sync::{Arc, Mutex}; + + use async_trait::async_trait; + use iceberg::Result; + use iceberg_catalog_rest::TokenAuthenticator; + + #[derive(Debug)] + struct TestAuthenticator { + token: String, + call_count: Arc>, + } + + #[async_trait] + impl TokenAuthenticator for TestAuthenticator { + async fn get_token(&self) -> Result { + let mut count = self.call_count.lock().unwrap(); + *count += 1; + Ok(self.token.clone()) + } + } + + let call_count = Arc::new(Mutex::new(0)); + let authenticator = Arc::new(TestAuthenticator { + token: "test_token_123".to_string(), + call_count: call_count.clone(), + }); + + // Apply authenticator before first use + let catalog_with_auth = catalog.with_token_authenticator(authenticator); + + // Create a namespace to trigger a catalog operation + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["test_custom_auth", "namespace"]).unwrap(), + HashMap::from([("owner".to_string(), "test_user".to_string())]), + ); + + let created_ns = catalog_with_auth + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + assert_eq!(ns.name(), created_ns.name()); + assert_map_contains(ns.properties(), created_ns.properties()); + + // Verify authenticator was called + let count = *call_count.lock().unwrap(); + assert!( + count > 0, + "Authenticator should have been called at least once, but was called {} times", + count + ); +} + +#[tokio::test] +async fn test_authenticator_token_refresh() { + let catalog = get_catalog().await; + + use std::sync::{Arc, Mutex}; + + use async_trait::async_trait; + use iceberg::Result; + use iceberg_catalog_rest::TokenAuthenticator; + + // Track how many times tokens were requested + let token_request_count = Arc::new(Mutex::new(0)); + let token_request_count_clone = token_request_count.clone(); + + #[derive(Debug)] + struct CountingAuthenticator { + count: Arc>, + } + + #[async_trait] + impl TokenAuthenticator for CountingAuthenticator { + async fn get_token(&self) -> Result { + let mut c = self.count.lock().unwrap(); + *c += 1; + // Return a unique token each time to ensure dynamic generation + Ok(format!("token_{}", *c)) + } + } + + let authenticator = Arc::new(CountingAuthenticator { + count: token_request_count_clone, + }); + + let catalog_with_auth = catalog.with_token_authenticator(authenticator); + + // Perform multiple operations that should trigger token requests + let ns1 = Namespace::with_properties( + NamespaceIdent::from_strs(["test_refresh_1"]).unwrap(), + HashMap::new(), + ); + catalog_with_auth + .create_namespace(ns1.name(), HashMap::new()) + .await + .unwrap(); + + let ns2 = Namespace::with_properties( + NamespaceIdent::from_strs(["test_refresh_2"]).unwrap(), + HashMap::new(), + ); + catalog_with_auth + .create_namespace(ns2.name(), HashMap::new()) + .await + .unwrap(); + + // Verify authenticator was called multiple times + let count = *token_request_count.lock().unwrap(); + assert!( + count >= 2, + "Authenticator should have been called at least twice, but was called {} times", + count + ); +} + +#[tokio::test] +async fn test_authenticator_persists_across_operations() { + let catalog = get_catalog().await; + + use std::sync::{Arc, Mutex}; + + use async_trait::async_trait; + use iceberg::Result; + use iceberg_catalog_rest::TokenAuthenticator; + + let operation_count = Arc::new(Mutex::new(0)); + let operation_count_clone = operation_count.clone(); + + #[derive(Debug)] + struct CountingAuthenticator { + count: Arc>, + } + + #[async_trait] + impl TokenAuthenticator for CountingAuthenticator { + async fn get_token(&self) -> Result { + let mut c = self.count.lock().unwrap(); + *c += 1; + Ok("persistent_token".to_string()) + } + } + + let authenticator = Arc::new(CountingAuthenticator { + count: operation_count_clone, + }); + + let catalog_with_auth = catalog.with_token_authenticator(authenticator); + + // Create a namespace + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["test_persist", "auth"]).unwrap(), + HashMap::new(), + ); + catalog_with_auth + .create_namespace(ns.name(), HashMap::new()) + .await + .unwrap(); + + let count_after_create = *operation_count.lock().unwrap(); + + // List the namespace (should use the same authenticator) + let list_result = catalog_with_auth.list_namespaces(None).await.unwrap(); + assert!(list_result.contains(ns.name())); + + let count_after_list = *operation_count.lock().unwrap(); + + // Verify authenticator was used for both operations + assert!( + count_after_create > 0, + "Authenticator should be used for create" + ); + assert!( + count_after_list > count_after_create, + "Authenticator should be used for list operation too" + ); +} From 576458744cf37d320050b03a028fc67e77645480 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:05:43 +0100 Subject: [PATCH 06/13] . --- crates/catalog/rest/tests/rest_catalog_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 790c43877b..c4e168cf44 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -618,7 +618,7 @@ async fn test_authenticator_persists_across_operations() { // List the namespace (should use the same authenticator) let list_result = catalog_with_auth.list_namespaces(None).await.unwrap(); - assert!(list_result.contains(ns.name())); + assert!(list_result.contains(ns.name()), "Namespace {:?} not found in list {:?}", ns.name(), list_result); let count_after_list = *operation_count.lock().unwrap(); From 4b7ea30d51200f20204b40fa9544b31a59751fab Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:10:28 +0100 Subject: [PATCH 07/13] Format --- crates/catalog/rest/tests/rest_catalog_test.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index c4e168cf44..de8d3cf8c7 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -618,7 +618,12 @@ async fn test_authenticator_persists_across_operations() { // List the namespace (should use the same authenticator) let list_result = catalog_with_auth.list_namespaces(None).await.unwrap(); - assert!(list_result.contains(ns.name()), "Namespace {:?} not found in list {:?}", ns.name(), list_result); + assert!( + list_result.contains(ns.name()), + "Namespace {:?} not found in list {:?}", + ns.name(), + list_result + ); let count_after_list = *operation_count.lock().unwrap(); From 2befd50facc1fdc3f329600da08a8a879b6a66dc Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:14:52 +0100 Subject: [PATCH 08/13] . --- crates/catalog/rest/tests/rest_catalog_test.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index de8d3cf8c7..24dae8563f 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -616,10 +616,14 @@ async fn test_authenticator_persists_across_operations() { let count_after_create = *operation_count.lock().unwrap(); - // List the namespace (should use the same authenticator) - let list_result = catalog_with_auth.list_namespaces(None).await.unwrap(); + // List the namespace children (should use the same authenticator) + // We need to list children of "test_persist" to find "auth" + let list_result = catalog_with_auth + .list_namespaces(Some(&NamespaceIdent::from_strs(["test_persist"]).unwrap())) + .await + .unwrap(); assert!( - list_result.contains(ns.name()), + list_result.contains(&NamespaceIdent::from_strs(["test_persist", "auth"]).unwrap()), "Namespace {:?} not found in list {:?}", ns.name(), list_result From 0e802be6ff998b14f0565edede458e2b5958bff2 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:19:24 +0100 Subject: [PATCH 09/13] Clean up --- .../catalog/rest/tests/rest_catalog_test.rs | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 24dae8563f..6910391cd8 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -19,13 +19,19 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::RwLock; +use std::sync::{Arc, Mutex, RwLock}; +use async_trait::async_trait; use ctor::{ctor, dtor}; use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; -use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; -use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder}; +use iceberg::{ + Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result as IcebergResult, TableCreation, + TableIdent, +}; +use iceberg_catalog_rest::{ + REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder, TokenAuthenticator, +}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use port_scanner::scan_port_addr; @@ -455,12 +461,6 @@ async fn test_catalog_with_custom_token_authenticator() { let catalog = get_catalog().await; // Create a mock authenticator that returns a fixed token - use std::sync::{Arc, Mutex}; - - use async_trait::async_trait; - use iceberg::Result; - use iceberg_catalog_rest::TokenAuthenticator; - #[derive(Debug)] struct TestAuthenticator { token: String, @@ -469,7 +469,7 @@ async fn test_catalog_with_custom_token_authenticator() { #[async_trait] impl TokenAuthenticator for TestAuthenticator { - async fn get_token(&self) -> Result { + async fn get_token(&self) -> IcebergResult { let mut count = self.call_count.lock().unwrap(); *count += 1; Ok(self.token.clone()) @@ -512,12 +512,6 @@ async fn test_catalog_with_custom_token_authenticator() { async fn test_authenticator_token_refresh() { let catalog = get_catalog().await; - use std::sync::{Arc, Mutex}; - - use async_trait::async_trait; - use iceberg::Result; - use iceberg_catalog_rest::TokenAuthenticator; - // Track how many times tokens were requested let token_request_count = Arc::new(Mutex::new(0)); let token_request_count_clone = token_request_count.clone(); @@ -529,7 +523,7 @@ async fn test_authenticator_token_refresh() { #[async_trait] impl TokenAuthenticator for CountingAuthenticator { - async fn get_token(&self) -> Result { + async fn get_token(&self) -> IcebergResult { let mut c = self.count.lock().unwrap(); *c += 1; // Return a unique token each time to ensure dynamic generation @@ -575,12 +569,6 @@ async fn test_authenticator_token_refresh() { async fn test_authenticator_persists_across_operations() { let catalog = get_catalog().await; - use std::sync::{Arc, Mutex}; - - use async_trait::async_trait; - use iceberg::Result; - use iceberg_catalog_rest::TokenAuthenticator; - let operation_count = Arc::new(Mutex::new(0)); let operation_count_clone = operation_count.clone(); @@ -591,7 +579,7 @@ async fn test_authenticator_persists_across_operations() { #[async_trait] impl TokenAuthenticator for CountingAuthenticator { - async fn get_token(&self) -> Result { + async fn get_token(&self) -> IcebergResult { let mut c = self.count.lock().unwrap(); *c += 1; Ok("persistent_token".to_string()) From 8604c3871c81c3eb37959297c2ab48b264bb30d2 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 12:54:35 +0100 Subject: [PATCH 10/13] PR comments --- crates/catalog/rest/src/catalog.rs | 47 ++++---- crates/catalog/rest/src/client.rs | 44 ++++---- crates/catalog/rest/src/lib.rs | 2 +- .../catalog/rest/tests/rest_catalog_test.rs | 106 ++++++++---------- 4 files changed, 95 insertions(+), 104 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 1739e2d981..ce5656cfca 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -39,7 +39,7 @@ use tokio::sync::OnceCell; use typed_builder::TypedBuilder; use crate::client::{ - HttpClient, TokenAuthenticator, deserialize_catalog_response, + CustomAuthenticator, HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, }; use crate::types::{ @@ -69,6 +69,7 @@ impl Default for RestCatalogBuilder { warehouse: None, props: HashMap::new(), client: None, + authenticator: None, }) } } @@ -126,6 +127,24 @@ impl RestCatalogBuilder { self.0.client = Some(client); self } + + /// Set a custom token authenticator. + /// + /// The authenticator will be used to obtain tokens instead of using static tokens + /// or OAuth credentials. + /// + /// # Example + /// ```ignore + /// let authenticator = Arc::new(MyAuthenticator::new()); + /// let catalog = RestCatalogBuilder::default() + /// .with_token_authenticator(authenticator) + /// .load("rest", config) + /// .await?; + /// ``` + pub fn with_token_authenticator(mut self, authenticator: Arc) -> Self { + self.0.authenticator = Some(authenticator); + self + } } /// Rest catalog configuration. @@ -144,6 +163,9 @@ pub(crate) struct RestCatalogConfig { #[builder(default)] client: Option, + + #[builder(default)] + authenticator: Option>, } impl RestCatalogConfig { @@ -329,8 +351,6 @@ pub struct RestCatalog { ctx: OnceCell, /// Extensions for the FileIOBuilder. file_io_extensions: io::Extensions, - /// Custom token authenticator (must be set before first use) - authenticator: OnceCell>, } impl RestCatalog { @@ -340,7 +360,6 @@ impl RestCatalog { user_config: config, ctx: OnceCell::new(), file_io_extensions: io::Extensions::default(), - authenticator: OnceCell::new(), } } @@ -350,24 +369,6 @@ impl RestCatalog { self } - /// Set a custom token authenticator. - /// - /// The authenticator will be used to obtain tokens instead of using static tokens - /// or OAuth credentials. This must be called before any catalog operations. - /// - /// # Example - /// ```ignore - /// let authenticator = Arc::new(MyTokenAuthenticator::new()); - /// let catalog = RestCatalogBuilder::default() - /// .load("rest", config) - /// .await? - /// .with_token_authenticator(authenticator); - /// ``` - pub fn with_token_authenticator(self, authenticator: Arc) -> Self { - let _ = self.authenticator.set(authenticator); - self - } - /// Gets the [`RestContext`] from the catalog. async fn context(&self) -> Result<&RestContext> { self.ctx @@ -375,7 +376,7 @@ impl RestCatalog { let mut client = HttpClient::new(&self.user_config)?; // Set authenticator if one was configured - if let Some(authenticator) = self.authenticator.get() { + if let Some(authenticator) = &self.user_config.authenticator { client = client.with_authenticator(authenticator.clone()); } diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 0988786ac9..69319ec477 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -34,7 +34,7 @@ use crate::types::{ErrorResponse, TokenResponse}; /// Implement this trait to provide custom token generation/refresh logic /// instead of using OAuth credentials. #[async_trait::async_trait] -pub trait TokenAuthenticator: Send + Sync + Debug { +pub trait CustomAuthenticator: Send + Sync + Debug { /// Get or refresh the authentication token. /// Called when the client needs a token for authentication. async fn get_token(&self) -> Result; @@ -52,7 +52,7 @@ pub(crate) struct HttpClient { /// The credential to be used for authentication. credential: Option<(Option, String)>, /// Custom token authenticator (takes precedence over credential/token) - authenticator: Option>, + authenticator: Option>, /// Extra headers to be added to each request. extra_headers: HeaderMap, /// Extra oauth parameters to be added to each authentication request. @@ -195,11 +195,26 @@ impl HttpClient { /// When set, the authenticator will be called to get tokens instead of using /// static tokens or OAuth credentials. This allows for custom token management /// such as reading from files, APIs, or other custom sources. - pub fn with_authenticator(mut self, authenticator: Arc) -> Self { + pub fn with_authenticator(mut self, authenticator: Arc) -> Self { self.authenticator = Some(authenticator); self } + /// Add bearer token to request authorization header. + fn set_bearer_token(req: &mut Request, token: &str) -> Result<()> { + req.headers_mut().insert( + http::header::AUTHORIZATION, + format!("Bearer {token}").parse().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid token received from catalog server!", + ) + .with_source(e) + })?, + ); + Ok(()) + } + /// Invalidate the current token without generating a new one. On the next request, the client /// will attempt to generate a new token. pub(crate) async fn invalidate_token(&self) -> Result<()> { @@ -223,7 +238,7 @@ impl HttpClient { /// /// This method supports four authentication modes (in order of precedence): /// - /// 1. **Custom authenticator** - If set, use the custom TokenAuthenticator to get tokens. + /// 1. **Custom authenticator** - If set, use the custom CustomAuthenticator to get tokens. /// 2. **Token authentication** - Use the provided static `token` directly. /// 3. **OAuth authentication** - Exchange `credential` for a token, cache it, then use it. /// 4. **No authentication** - Skip authentication when none of the above are available. @@ -233,13 +248,7 @@ impl HttpClient { // Try authenticator first (highest priority) if let Some(authenticator) = &self.authenticator { let token = authenticator.get_token().await?; - req.headers_mut().insert( - http::header::AUTHORIZATION, - format!("Bearer {token}").parse().map_err(|e| { - Error::new(ErrorKind::DataInvalid, "Invalid token from authenticator!") - .with_source(e) - })?, - ); + Self::set_bearer_token(req, &token)?; return Ok(()); } @@ -262,18 +271,7 @@ impl HttpClient { } }; - // Insert token in request. - req.headers_mut().insert( - http::header::AUTHORIZATION, - format!("Bearer {token}").parse().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid token received from catalog server!", - ) - .with_source(e) - })?, - ); - + Self::set_bearer_token(req, &token)?; Ok(()) } diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 1d66b92f9d..c8e1b98877 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -56,4 +56,4 @@ mod client; mod types; pub use catalog::*; -pub use client::TokenAuthenticator; +pub use client::CustomAuthenticator; diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 6910391cd8..1e34352187 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -457,61 +457,21 @@ async fn test_register_table() { } #[tokio::test] -async fn test_catalog_with_custom_token_authenticator() { - let catalog = get_catalog().await; +async fn test_authenticator_token_refresh() { + set_up(); - // Create a mock authenticator that returns a fixed token - #[derive(Debug)] - struct TestAuthenticator { - token: String, - call_count: Arc>, - } + let rest_catalog_ip = { + let guard = DOCKER_COMPOSE_ENV.read().unwrap(); + let docker_compose = guard.as_ref().unwrap(); + docker_compose.get_container_ip("rest") + }; - #[async_trait] - impl TokenAuthenticator for TestAuthenticator { - async fn get_token(&self) -> IcebergResult { - let mut count = self.call_count.lock().unwrap(); - *count += 1; - Ok(self.token.clone()) - } + let rest_socket_addr = SocketAddr::new(rest_catalog_ip, REST_CATALOG_PORT); + while !scan_port_addr(rest_socket_addr) { + info!("Waiting for 1s rest catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; } - let call_count = Arc::new(Mutex::new(0)); - let authenticator = Arc::new(TestAuthenticator { - token: "test_token_123".to_string(), - call_count: call_count.clone(), - }); - - // Apply authenticator before first use - let catalog_with_auth = catalog.with_token_authenticator(authenticator); - - // Create a namespace to trigger a catalog operation - let ns = Namespace::with_properties( - NamespaceIdent::from_strs(["test_custom_auth", "namespace"]).unwrap(), - HashMap::from([("owner".to_string(), "test_user".to_string())]), - ); - - let created_ns = catalog_with_auth - .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); - - assert_eq!(ns.name(), created_ns.name()); - assert_map_contains(ns.properties(), created_ns.properties()); - - // Verify authenticator was called - let count = *call_count.lock().unwrap(); - assert!( - count > 0, - "Authenticator should have been called at least once, but was called {} times", - count - ); -} - -#[tokio::test] -async fn test_authenticator_token_refresh() { - let catalog = get_catalog().await; - // Track how many times tokens were requested let token_request_count = Arc::new(Mutex::new(0)); let token_request_count_clone = token_request_count.clone(); @@ -522,7 +482,7 @@ async fn test_authenticator_token_refresh() { } #[async_trait] - impl TokenAuthenticator for CountingAuthenticator { + impl CustomAuthenticator for CountingAuthenticator { async fn get_token(&self) -> IcebergResult { let mut c = self.count.lock().unwrap(); *c += 1; @@ -535,7 +495,17 @@ async fn test_authenticator_token_refresh() { count: token_request_count_clone, }); - let catalog_with_auth = catalog.with_token_authenticator(authenticator); + let catalog_with_auth = RestCatalogBuilder::default() + .with_token_authenticator(authenticator) + .load( + "rest", + HashMap::from([( + REST_CATALOG_PROP_URI.to_string(), + format!("http://{rest_socket_addr}"), + )]), + ) + .await + .unwrap(); // Perform multiple operations that should trigger token requests let ns1 = Namespace::with_properties( @@ -559,7 +529,7 @@ async fn test_authenticator_token_refresh() { // Verify authenticator was called multiple times let count = *token_request_count.lock().unwrap(); assert!( - count >= 2, + count == 2, "Authenticator should have been called at least twice, but was called {} times", count ); @@ -567,7 +537,19 @@ async fn test_authenticator_token_refresh() { #[tokio::test] async fn test_authenticator_persists_across_operations() { - let catalog = get_catalog().await; + set_up(); + + let rest_catalog_ip = { + let guard = DOCKER_COMPOSE_ENV.read().unwrap(); + let docker_compose = guard.as_ref().unwrap(); + docker_compose.get_container_ip("rest") + }; + + let rest_socket_addr = SocketAddr::new(rest_catalog_ip, REST_CATALOG_PORT); + while !scan_port_addr(rest_socket_addr) { + info!("Waiting for 1s rest catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } let operation_count = Arc::new(Mutex::new(0)); let operation_count_clone = operation_count.clone(); @@ -578,7 +560,7 @@ async fn test_authenticator_persists_across_operations() { } #[async_trait] - impl TokenAuthenticator for CountingAuthenticator { + impl CustomAuthenticator for CountingAuthenticator { async fn get_token(&self) -> IcebergResult { let mut c = self.count.lock().unwrap(); *c += 1; @@ -590,7 +572,17 @@ async fn test_authenticator_persists_across_operations() { count: operation_count_clone, }); - let catalog_with_auth = catalog.with_token_authenticator(authenticator); + let catalog_with_auth = RestCatalogBuilder::default() + .with_token_authenticator(authenticator) + .load( + "rest", + HashMap::from([( + REST_CATALOG_PROP_URI.to_string(), + format!("http://{rest_socket_addr}"), + )]), + ) + .await + .unwrap(); // Create a namespace let ns = Namespace::with_properties( From 6bdd66d56b824b44038d409495854b537f2a85b2 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:01:51 +0100 Subject: [PATCH 11/13] Fix trait name --- crates/catalog/rest/tests/rest_catalog_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 1e34352187..463c6ae2f8 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -30,7 +30,7 @@ use iceberg::{ TableIdent, }; use iceberg_catalog_rest::{ - REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder, TokenAuthenticator, + CustomAuthenticator, REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder, }; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; From 2222ec403702c22b5436e68e2ea9c48ea2db63f8 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:10:07 +0100 Subject: [PATCH 12/13] . --- crates/catalog/rest/src/client.rs | 16 ++- .../catalog/rest/tests/rest_catalog_test.rs | 100 ++++++------------ 2 files changed, 40 insertions(+), 76 deletions(-) diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 69319ec477..e8c3307e98 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -201,16 +201,12 @@ impl HttpClient { } /// Add bearer token to request authorization header. - fn set_bearer_token(req: &mut Request, token: &str) -> Result<()> { + fn set_bearer_token(req: &mut Request, token: &str, error_msg: &str) -> Result<()> { req.headers_mut().insert( http::header::AUTHORIZATION, - format!("Bearer {token}").parse().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid token received from catalog server!", - ) - .with_source(e) - })?, + format!("Bearer {token}") + .parse() + .map_err(|e| Error::new(ErrorKind::DataInvalid, error_msg).with_source(e))?, ); Ok(()) } @@ -248,7 +244,7 @@ impl HttpClient { // Try authenticator first (highest priority) if let Some(authenticator) = &self.authenticator { let token = authenticator.get_token().await?; - Self::set_bearer_token(req, &token)?; + Self::set_bearer_token(req, &token, "Invalid custom token")?; return Ok(()); } @@ -271,7 +267,7 @@ impl HttpClient { } }; - Self::set_bearer_token(req, &token)?; + Self::set_bearer_token(req, &token, "Invalid token received from catalog server!")?; Ok(()) } diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 463c6ae2f8..c7b74e4503 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -456,8 +456,24 @@ async fn test_register_table() { ); } -#[tokio::test] -async fn test_authenticator_token_refresh() { +#[derive(Debug)] +struct CountingAuthenticator { + count: Arc>, +} + +#[async_trait] +impl CustomAuthenticator for CountingAuthenticator { + async fn get_token(&self) -> IcebergResult { + let mut c = self.count.lock().unwrap(); + *c += 1; + // Return a unique token each time to ensure dynamic generation + Ok(format!("token_{}", *c)) + } +} + +async fn get_catalog_with_authenticator( + authenticator: Arc, +) -> RestCatalog { set_up(); let rest_catalog_ip = { @@ -472,30 +488,7 @@ async fn test_authenticator_token_refresh() { sleep(std::time::Duration::from_millis(1000)).await; } - // Track how many times tokens were requested - let token_request_count = Arc::new(Mutex::new(0)); - let token_request_count_clone = token_request_count.clone(); - - #[derive(Debug)] - struct CountingAuthenticator { - count: Arc>, - } - - #[async_trait] - impl CustomAuthenticator for CountingAuthenticator { - async fn get_token(&self) -> IcebergResult { - let mut c = self.count.lock().unwrap(); - *c += 1; - // Return a unique token each time to ensure dynamic generation - Ok(format!("token_{}", *c)) - } - } - - let authenticator = Arc::new(CountingAuthenticator { - count: token_request_count_clone, - }); - - let catalog_with_auth = RestCatalogBuilder::default() + RestCatalogBuilder::default() .with_token_authenticator(authenticator) .load( "rest", @@ -505,7 +498,20 @@ async fn test_authenticator_token_refresh() { )]), ) .await - .unwrap(); + .unwrap() +} + +#[tokio::test] +async fn test_authenticator_token_refresh() { + // Track how many times tokens were requested + let token_request_count = Arc::new(Mutex::new(0)); + let token_request_count_clone = token_request_count.clone(); + + let authenticator = Arc::new(CountingAuthenticator { + count: token_request_count_clone, + }); + + let catalog_with_auth = get_catalog_with_authenticator(authenticator).await; // Perform multiple operations that should trigger token requests let ns1 = Namespace::with_properties( @@ -537,52 +543,14 @@ async fn test_authenticator_token_refresh() { #[tokio::test] async fn test_authenticator_persists_across_operations() { - set_up(); - - let rest_catalog_ip = { - let guard = DOCKER_COMPOSE_ENV.read().unwrap(); - let docker_compose = guard.as_ref().unwrap(); - docker_compose.get_container_ip("rest") - }; - - let rest_socket_addr = SocketAddr::new(rest_catalog_ip, REST_CATALOG_PORT); - while !scan_port_addr(rest_socket_addr) { - info!("Waiting for 1s rest catalog to ready..."); - sleep(std::time::Duration::from_millis(1000)).await; - } - let operation_count = Arc::new(Mutex::new(0)); let operation_count_clone = operation_count.clone(); - #[derive(Debug)] - struct CountingAuthenticator { - count: Arc>, - } - - #[async_trait] - impl CustomAuthenticator for CountingAuthenticator { - async fn get_token(&self) -> IcebergResult { - let mut c = self.count.lock().unwrap(); - *c += 1; - Ok("persistent_token".to_string()) - } - } - let authenticator = Arc::new(CountingAuthenticator { count: operation_count_clone, }); - let catalog_with_auth = RestCatalogBuilder::default() - .with_token_authenticator(authenticator) - .load( - "rest", - HashMap::from([( - REST_CATALOG_PROP_URI.to_string(), - format!("http://{rest_socket_addr}"), - )]), - ) - .await - .unwrap(); + let catalog_with_auth = get_catalog_with_authenticator(authenticator).await; // Create a namespace let ns = Namespace::with_properties( From f39efe2cc35c3b90329a8055381c51d0c3df0b0b Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:16:05 +0100 Subject: [PATCH 13/13] . --- crates/catalog/rest/tests/rest_catalog_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index c7b74e4503..30be3f2f57 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -535,7 +535,7 @@ async fn test_authenticator_token_refresh() { // Verify authenticator was called multiple times let count = *token_request_count.lock().unwrap(); assert!( - count == 2, + count >= 2, "Authenticator should have been called at least twice, but was called {} times", count );