From 780210361c2cc50dc67d76671c0548f76847c3bc Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Wed, 21 Jan 2026 13:15:45 +0530 Subject: [PATCH] added private link and removed tuple/Pair --- .../kusto/ingest/v2/ConfigurationClient.kt | 5 + .../kusto/ingest/v2/KustoBaseApiClient.kt | 7 +- .../v2/builders/BaseIngestClientBuilder.kt | 5 +- .../ManagedStreamingIngestClientBuilder.kt | 3 + .../v2/builders/QueuedIngestClientBuilder.kt | 3 + .../ingest/v2/common/ConfigurationCache.kt | 5 + .../kusto/ingest/v2/common/models/S2SToken.kt | 30 ++ .../v2/uploader/ContainerUploaderBase.kt | 34 ++- .../ingest/v2/common/FabricPrivateLinkTest.kt | 272 ++++++++++++++++++ 9 files changed, 349 insertions(+), 15 deletions(-) create mode 100644 ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/models/S2SToken.kt create mode 100644 ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/common/FabricPrivateLinkTest.kt diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClient.kt index 98b2f322e..1ca62fa7f 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClient.kt @@ -5,6 +5,7 @@ package com.microsoft.azure.kusto.ingest.v2 import com.azure.core.credential.TokenCredential import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails +import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse import io.ktor.http.HttpStatusCode @@ -16,12 +17,16 @@ class ConfigurationClient( override val tokenCredential: TokenCredential, override val skipSecurityChecks: Boolean = false, override val clientDetails: ClientDetails, + override val s2sTokenProvider: (suspend () -> S2SToken)? = null, + override val s2sFabricPrivateLinkAccessContext: String? = null, ) : KustoBaseApiClient( dmUrl, tokenCredential, skipSecurityChecks, clientDetails, + s2sTokenProvider = s2sTokenProvider, + s2sFabricPrivateLinkAccessContext = s2sFabricPrivateLinkAccessContext, ) { private val logger = LoggerFactory.getLogger(ConfigurationClient::class.java) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt index a96f3c94b..c5d7e1986 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt @@ -7,6 +7,7 @@ import com.azure.core.credential.TokenRequestContext import com.microsoft.azure.kusto.ingest.v2.apis.DefaultApi import com.microsoft.azure.kusto.ingest.v2.auth.endpoints.KustoTrustedEndpoints import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails +import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer import io.ktor.client.HttpClientConfig import io.ktor.client.plugins.DefaultRequest @@ -33,7 +34,7 @@ open class KustoBaseApiClient( open val skipSecurityChecks: Boolean = false, open val clientDetails: ClientDetails, open val clientRequestIdPrefix: String = "KIC.execute", - open val s2sTokenProvider: (suspend () -> Pair)? = null, + open val s2sTokenProvider: (suspend () -> S2SToken)? = null, open val s2sFabricPrivateLinkAccessContext: String? = null, ) { private val logger = LoggerFactory.getLogger(KustoBaseApiClient::class.java) @@ -133,10 +134,10 @@ open class KustoBaseApiClient( onRequest { request, _ -> try { // Get S2S token - val (token, scheme) = provider() + val s2sToken = provider() request.headers.append( "x-ms-s2s-actor-authorization", - "$scheme $token", + s2sToken.toHeaderValue(), ) // Add Fabric Private Link access context header diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt index 053fb3956..e086c6f47 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt @@ -8,6 +8,7 @@ import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_CONCURRENCY import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails +import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader import com.microsoft.azure.kusto.ingest.v2.uploader.ManagedUploader @@ -17,7 +18,7 @@ abstract class BaseIngestClientBuilder> { protected var clientDetails: ClientDetails? = null // Fabric Private Link support - protected var s2sTokenProvider: (suspend () -> Pair)? = null + protected var s2sTokenProvider: (suspend () -> S2SToken)? = null protected var s2sFabricPrivateLinkAccessContext: String? = null // Added properties for ingestion endpoint and authentication @@ -60,7 +61,7 @@ abstract class BaseIngestClientBuilder> { * @return This builder instance for method chaining */ fun withFabricPrivateLink( - s2sTokenProvider: suspend () -> Pair, + s2sTokenProvider: suspend () -> S2SToken, s2sFabricPrivateLinkAccessContext: String, ): T { require(s2sFabricPrivateLinkAccessContext.isNotBlank()) { diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/ManagedStreamingIngestClientBuilder.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/ManagedStreamingIngestClientBuilder.kt index cf8b882a6..ffe8f32e6 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/ManagedStreamingIngestClientBuilder.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/ManagedStreamingIngestClientBuilder.kt @@ -55,6 +55,9 @@ private constructor(private val dmUrl: String) : tokenCredential = this.tokenCredential, skipSecurityChecks = this.skipSecurityChecks, clientDetails = effectiveClientDetails, + s2sTokenProvider = this.s2sTokenProvider, + s2sFabricPrivateLinkAccessContext = + this.s2sFabricPrivateLinkAccessContext, ) val effectiveUploader = diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt index 04834ec70..313951905 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt @@ -71,6 +71,9 @@ class QueuedIngestClientBuilder private constructor(private val dmUrl: String) : tokenCredential = this.tokenCredential, skipSecurityChecks = this.skipSecurityChecks, clientDetails = effectiveClientDetails, + s2sTokenProvider = this.s2sTokenProvider, + s2sFabricPrivateLinkAccessContext = + this.s2sFabricPrivateLinkAccessContext, ) val apiClient = createApiClient( diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt index d9530f546..b450a6152 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt @@ -7,6 +7,7 @@ import com.microsoft.azure.kusto.ingest.v2.CONFIG_CACHE_DEFAULT_REFRESH_INTERVAL import com.microsoft.azure.kusto.ingest.v2.CONFIG_CACHE_DEFAULT_SKIP_SECURITY_CHECKS import com.microsoft.azure.kusto.ingest.v2.ConfigurationClient import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails +import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse import java.lang.AutoCloseable import java.time.Duration @@ -68,6 +69,8 @@ class DefaultConfigurationCache( CONFIG_CACHE_DEFAULT_SKIP_SECURITY_CHECKS, val clientDetails: ClientDetails, val configurationProvider: (suspend () -> ConfigurationResponse)? = null, + val s2sTokenProvider: (suspend () -> S2SToken)? = null, + val s2sFabricPrivateLinkAccessContext: String? = null, ) : ConfigurationCache { companion object { /** @@ -145,6 +148,8 @@ class DefaultConfigurationCache( tokenCredential!!, skipSecurityChecks!!, clientDetails, + s2sTokenProvider, + s2sFabricPrivateLinkAccessContext, ) .getConfigurationDetails() } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/models/S2SToken.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/models/S2SToken.kt new file mode 100644 index 000000000..5d22f087c --- /dev/null +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/models/S2SToken.kt @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.common.models + +/** + * Represents an S2S (Service-to-Service) authentication token used for + * Fabric Private Link authentication. + * + * @property scheme The authentication scheme (e.g., "Bearer") + * @property token The authentication token value + */ +data class S2SToken( + val scheme: String, + val token: String, +) { + /** + * Formats the token as an HTTP Authorization header value. + * @return The formatted header value in the format "{scheme} {token}" + */ + fun toHeaderValue(): String = "$scheme $token" + + companion object { + /** + * Creates an S2SToken with the Bearer scheme. + * @param token The token value + * @return An S2SToken with scheme "Bearer" + */ + fun bearer(token: String): S2SToken = S2SToken("Bearer", token) + } +} diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index c7536f76b..929e4eec3 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -123,7 +123,7 @@ abstract class ContainerUploaderBase( } // Compress stream if needed (for non-binary, non-compressed formats) - val (uploadStream, effectiveCompressionType, compressionJob) = + val preparedStream = if (local.shouldCompress) { logger.debug( "Auto-compressing stream for {} (format: {}, original compression: {})", @@ -137,13 +137,17 @@ abstract class ContainerUploaderBase( name, availableSize, ) - Triple( - compressResult.stream, - CompressionType.GZIP, - compressResult.compressionJob, + PreparedUploadStream( + stream = compressResult.stream, + compressionType = CompressionType.GZIP, + compressionJob = compressResult.compressionJob, ) } else { - Triple(originalStream, local.compressionType, null) + PreparedUploadStream( + stream = originalStream, + compressionType = local.compressionType, + compressionJob = null, + ) } // Upload with retry policy and container cycling @@ -151,13 +155,13 @@ abstract class ContainerUploaderBase( uploadWithRetries( local = local, name = name, - stream = uploadStream, + stream = preparedStream.stream, containers = containers, - effectiveCompressionType = effectiveCompressionType, + effectiveCompressionType = preparedStream.compressionType, ) .also { // Ensure compression job completes successfully - compressionJob?.await() + preparedStream.compressionJob?.await() logger.debug( "Compression job completed successfully for {}", name, @@ -165,7 +169,7 @@ abstract class ContainerUploaderBase( } } catch (e: Exception) { // Cancel compression job if upload fails - compressionJob?.cancel() + preparedStream.compressionJob?.cancel() throw e } } @@ -262,6 +266,16 @@ abstract class ContainerUploaderBase( val compressionJob: kotlinx.coroutines.Deferred, ) + /** + * Helper class to hold prepared upload stream with its compression type + * and optional compression job. + */ + private data class PreparedUploadStream( + val stream: InputStream, + val compressionType: CompressionType, + val compressionJob: kotlinx.coroutines.Deferred?, + ) + /** * Uploads a stream with retry logic and container cycling. Uses an * incrementing counter (mod container count) for round-robin container diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/common/FabricPrivateLinkTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/common/FabricPrivateLinkTest.kt new file mode 100644 index 000000000..d90aa667d --- /dev/null +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/common/FabricPrivateLinkTest.kt @@ -0,0 +1,272 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.common + +import com.azure.core.credential.TokenCredential +import com.microsoft.azure.kusto.ingest.v2.ConfigurationClient +import com.microsoft.azure.kusto.ingest.v2.builders.ManagedStreamingIngestClientBuilder +import com.microsoft.azure.kusto.ingest.v2.builders.QueuedIngestClientBuilder +import com.microsoft.azure.kusto.ingest.v2.builders.StreamingIngestClientBuilder +import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails +import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken +import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull + +/** + * Tests for Fabric Private Link (S2S authentication) support in the + * configuration cache and client builders. + */ +class FabricPrivateLinkTest { + + private val validDmUrl = "https://ingest-test.kusto.windows.net" + private val mockTokenCredential: TokenCredential = mockk(relaxed = true) + private val testAccessContext = "test-fabric-access-context" + private val testS2SScheme = "Bearer" + private val testS2SToken = "s2s-test-token" + + private val mockS2sTokenProvider: suspend () -> S2SToken = { + S2SToken(testS2SScheme, testS2SToken) + } + + // ==================== DefaultConfigurationCache Tests ==================== + + @Test + fun `DefaultConfigurationCache should accept S2S parameters`() { + val cache = + DefaultConfigurationCache( + dmUrl = validDmUrl, + tokenCredential = mockTokenCredential, + skipSecurityChecks = true, + clientDetails = ClientDetails.createDefault(), + s2sTokenProvider = mockS2sTokenProvider, + s2sFabricPrivateLinkAccessContext = testAccessContext, + ) + assertNotNull(cache) + } + + @Test + fun `DefaultConfigurationCache should work without S2S parameters`() { + val cache = + DefaultConfigurationCache( + dmUrl = validDmUrl, + tokenCredential = mockTokenCredential, + skipSecurityChecks = true, + clientDetails = ClientDetails.createDefault(), + ) + assertNotNull(cache) + } + + @Test + fun `DefaultConfigurationCache should accept only S2S token provider`() { + val cache = + DefaultConfigurationCache( + dmUrl = validDmUrl, + tokenCredential = mockTokenCredential, + skipSecurityChecks = true, + clientDetails = ClientDetails.createDefault(), + s2sTokenProvider = mockS2sTokenProvider, + ) + assertNotNull(cache) + } + + @Test + fun `DefaultConfigurationCache should accept only access context`() { + val cache = + DefaultConfigurationCache( + dmUrl = validDmUrl, + tokenCredential = mockTokenCredential, + skipSecurityChecks = true, + clientDetails = ClientDetails.createDefault(), + s2sFabricPrivateLinkAccessContext = testAccessContext, + ) + assertNotNull(cache) + } + + @Test + fun `DefaultConfigurationCache with custom provider should work with S2S parameters`() { + val mockConfigResponse: ConfigurationResponse = mockk(relaxed = true) + val customProvider: suspend () -> ConfigurationResponse = { + mockConfigResponse + } + + val cache = + DefaultConfigurationCache( + clientDetails = ClientDetails.createDefault(), + configurationProvider = customProvider, + s2sTokenProvider = mockS2sTokenProvider, + s2sFabricPrivateLinkAccessContext = testAccessContext, + ) + + runBlocking { + val config = cache.getConfiguration() + assertNotNull(config) + } + } + + // ==================== ConfigurationClient Tests ==================== + + @Test + fun `ConfigurationClient should accept S2S parameters`() { + val client = + ConfigurationClient( + dmUrl = validDmUrl, + tokenCredential = mockTokenCredential, + skipSecurityChecks = true, + clientDetails = ClientDetails.createDefault(), + s2sTokenProvider = mockS2sTokenProvider, + s2sFabricPrivateLinkAccessContext = testAccessContext, + ) + assertNotNull(client) + assertEquals(mockS2sTokenProvider, client.s2sTokenProvider) + assertEquals(testAccessContext, client.s2sFabricPrivateLinkAccessContext) + } + + @Test + fun `ConfigurationClient should work without S2S parameters`() { + val client = + ConfigurationClient( + dmUrl = validDmUrl, + tokenCredential = mockTokenCredential, + skipSecurityChecks = true, + clientDetails = ClientDetails.createDefault(), + ) + assertNotNull(client) + assertNull(client.s2sTokenProvider) + assertNull(client.s2sFabricPrivateLinkAccessContext) + } + + // ==================== QueuedIngestClientBuilder Tests ==================== + + @Test + fun `QueuedIngestClientBuilder should accept withFabricPrivateLink`() { + val builder = + QueuedIngestClientBuilder.create(validDmUrl) + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + + assertNotNull(builder) + } + + @Test + fun `QueuedIngestClientBuilder build with FabricPrivateLink should succeed`() { + val client = + QueuedIngestClientBuilder.create(validDmUrl) + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + .build() + + assertNotNull(client) + } + + @Test + fun `QueuedIngestClientBuilder should chain withFabricPrivateLink correctly`() { + val builder = QueuedIngestClientBuilder.create(validDmUrl) + val result = + builder + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + .withMaxConcurrency(5) + .skipSecurityChecks() + + assertEquals(builder, result) + } + + // ==================== StreamingIngestClientBuilder Tests ==================== + + @Test + fun `StreamingIngestClientBuilder should accept withFabricPrivateLink`() { + val builder = + StreamingIngestClientBuilder.create(validDmUrl) + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + + assertNotNull(builder) + } + + @Test + fun `StreamingIngestClientBuilder build with FabricPrivateLink should succeed`() { + val client = + StreamingIngestClientBuilder.create(validDmUrl) + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + .build() + + assertNotNull(client) + } + + // ==================== ManagedStreamingIngestClientBuilder Tests ==================== + + @Test + fun `ManagedStreamingIngestClientBuilder should accept withFabricPrivateLink`() { + val builder = + ManagedStreamingIngestClientBuilder.create(validDmUrl) + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + + assertNotNull(builder) + } + + @Test + fun `ManagedStreamingIngestClientBuilder build with FabricPrivateLink should succeed`() { + val client = + ManagedStreamingIngestClientBuilder.create(validDmUrl) + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + .build() + + assertNotNull(client) + } + + @Test + fun `ManagedStreamingIngestClientBuilder should chain withFabricPrivateLink correctly`() { + val builder = ManagedStreamingIngestClientBuilder.create(validDmUrl) + val result = + builder + .withAuthentication(mockTokenCredential) + .withFabricPrivateLink(mockS2sTokenProvider, testAccessContext) + .skipSecurityChecks() + + assertEquals(builder, result) + } + + // ==================== Integration-like Tests ==================== + + @Test + fun `S2S token provider should be callable and return expected values`() { + runBlocking { + val s2sToken = mockS2sTokenProvider() + assertEquals(testS2SScheme, s2sToken.scheme) + assertEquals(testS2SToken, s2sToken.token) + assertEquals("$testS2SScheme $testS2SToken", s2sToken.toHeaderValue()) + } + } + + @Test + fun `DefaultConfigurationCache with custom provider respects S2S context`() { + var providerCalled = false + val mockConfigResponse: ConfigurationResponse = mockk(relaxed = true) + val customProvider: suspend () -> ConfigurationResponse = { + providerCalled = true + mockConfigResponse + } + + val cache = + DefaultConfigurationCache( + clientDetails = ClientDetails.createDefault(), + configurationProvider = customProvider, + s2sTokenProvider = mockS2sTokenProvider, + s2sFabricPrivateLinkAccessContext = testAccessContext, + ) + + runBlocking { + cache.getConfiguration() + } + + assertEquals(true, providerCalled) + } +}