Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.microsoft.azure.kusto.ingest.v2
import com.azure.core.credential.TokenCredential
import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken
import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse
import io.ktor.http.HttpStatusCode
Expand All @@ -16,12 +17,16 @@ class ConfigurationClient(
override val tokenCredential: TokenCredential,
override val skipSecurityChecks: Boolean = false,
override val clientDetails: ClientDetails,
override val s2sTokenProvider: (suspend () -> S2SToken)? = null,
override val s2sFabricPrivateLinkAccessContext: String? = null,
) :
KustoBaseApiClient(
dmUrl,
tokenCredential,
skipSecurityChecks,
clientDetails,
s2sTokenProvider = s2sTokenProvider,
s2sFabricPrivateLinkAccessContext = s2sFabricPrivateLinkAccessContext,
) {
private val logger =
LoggerFactory.getLogger(ConfigurationClient::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.azure.core.credential.TokenRequestContext
import com.microsoft.azure.kusto.ingest.v2.apis.DefaultApi
import com.microsoft.azure.kusto.ingest.v2.auth.endpoints.KustoTrustedEndpoints
import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken
import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer
import io.ktor.client.HttpClientConfig
import io.ktor.client.plugins.DefaultRequest
Expand All @@ -33,7 +34,7 @@ open class KustoBaseApiClient(
open val skipSecurityChecks: Boolean = false,
open val clientDetails: ClientDetails,
open val clientRequestIdPrefix: String = "KIC.execute",
open val s2sTokenProvider: (suspend () -> Pair<String, String>)? = null,
open val s2sTokenProvider: (suspend () -> S2SToken)? = null,
open val s2sFabricPrivateLinkAccessContext: String? = null,
) {
private val logger = LoggerFactory.getLogger(KustoBaseApiClient::class.java)
Expand Down Expand Up @@ -133,10 +134,10 @@ open class KustoBaseApiClient(
onRequest { request, _ ->
try {
// Get S2S token
val (token, scheme) = provider()
val s2sToken = provider()
request.headers.append(
"x-ms-s2s-actor-authorization",
"$scheme $token",
s2sToken.toHeaderValue(),
)

// Add Fabric Private Link access context header
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_CONCURRENCY
import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES
import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache
import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken
import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader
import com.microsoft.azure.kusto.ingest.v2.uploader.ManagedUploader

Expand All @@ -17,7 +18,7 @@ abstract class BaseIngestClientBuilder<T : BaseIngestClientBuilder<T>> {
protected var clientDetails: ClientDetails? = null

// Fabric Private Link support
protected var s2sTokenProvider: (suspend () -> Pair<String, String>)? = null
protected var s2sTokenProvider: (suspend () -> S2SToken)? = null
protected var s2sFabricPrivateLinkAccessContext: String? = null

// Added properties for ingestion endpoint and authentication
Expand Down Expand Up @@ -60,7 +61,7 @@ abstract class BaseIngestClientBuilder<T : BaseIngestClientBuilder<T>> {
* @return This builder instance for method chaining
*/
fun withFabricPrivateLink(
s2sTokenProvider: suspend () -> Pair<String, String>,
s2sTokenProvider: suspend () -> S2SToken,
s2sFabricPrivateLinkAccessContext: String,
): T {
require(s2sFabricPrivateLinkAccessContext.isNotBlank()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ private constructor(private val dmUrl: String) :
tokenCredential = this.tokenCredential,
skipSecurityChecks = this.skipSecurityChecks,
clientDetails = effectiveClientDetails,
s2sTokenProvider = this.s2sTokenProvider,
s2sFabricPrivateLinkAccessContext =
this.s2sFabricPrivateLinkAccessContext,
)

val effectiveUploader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class QueuedIngestClientBuilder private constructor(private val dmUrl: String) :
tokenCredential = this.tokenCredential,
skipSecurityChecks = this.skipSecurityChecks,
clientDetails = effectiveClientDetails,
s2sTokenProvider = this.s2sTokenProvider,
s2sFabricPrivateLinkAccessContext =
this.s2sFabricPrivateLinkAccessContext,
)
val apiClient =
createApiClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.microsoft.azure.kusto.ingest.v2.CONFIG_CACHE_DEFAULT_REFRESH_INTERVAL
import com.microsoft.azure.kusto.ingest.v2.CONFIG_CACHE_DEFAULT_SKIP_SECURITY_CHECKS
import com.microsoft.azure.kusto.ingest.v2.ConfigurationClient
import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken
import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse
import java.lang.AutoCloseable
import java.time.Duration
Expand Down Expand Up @@ -68,6 +69,8 @@ class DefaultConfigurationCache(
CONFIG_CACHE_DEFAULT_SKIP_SECURITY_CHECKS,
val clientDetails: ClientDetails,
val configurationProvider: (suspend () -> ConfigurationResponse)? = null,
val s2sTokenProvider: (suspend () -> S2SToken)? = null,
val s2sFabricPrivateLinkAccessContext: String? = null,
) : ConfigurationCache {
companion object {
/**
Expand Down Expand Up @@ -145,6 +148,8 @@ class DefaultConfigurationCache(
tokenCredential!!,
skipSecurityChecks!!,
clientDetails,
s2sTokenProvider,
s2sFabricPrivateLinkAccessContext,
)
.getConfigurationDetails()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2.common.models

/**
* Represents an S2S (Service-to-Service) authentication token used for
* Fabric Private Link authentication.
*
* @property scheme The authentication scheme (e.g., "Bearer")
* @property token The authentication token value
*/
data class S2SToken(
val scheme: String,
val token: String,
) {
/**
* Formats the token as an HTTP Authorization header value.
* @return The formatted header value in the format "{scheme} {token}"
*/
fun toHeaderValue(): String = "$scheme $token"

companion object {
/**
* Creates an S2SToken with the Bearer scheme.
* @param token The token value
* @return An S2SToken with scheme "Bearer"
*/
fun bearer(token: String): S2SToken = S2SToken("Bearer", token)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ abstract class ContainerUploaderBase(
}

// Compress stream if needed (for non-binary, non-compressed formats)
val (uploadStream, effectiveCompressionType, compressionJob) =
val preparedStream =
if (local.shouldCompress) {
logger.debug(
"Auto-compressing stream for {} (format: {}, original compression: {})",
Expand All @@ -137,35 +137,39 @@ abstract class ContainerUploaderBase(
name,
availableSize,
)
Triple(
compressResult.stream,
CompressionType.GZIP,
compressResult.compressionJob,
PreparedUploadStream(
stream = compressResult.stream,
compressionType = CompressionType.GZIP,
compressionJob = compressResult.compressionJob,
)
} else {
Triple(originalStream, local.compressionType, null)
PreparedUploadStream(
stream = originalStream,
compressionType = local.compressionType,
compressionJob = null,
)
}

// Upload with retry policy and container cycling
return try {
uploadWithRetries(
local = local,
name = name,
stream = uploadStream,
stream = preparedStream.stream,
containers = containers,
effectiveCompressionType = effectiveCompressionType,
effectiveCompressionType = preparedStream.compressionType,
)
.also {
// Ensure compression job completes successfully
compressionJob?.await()
preparedStream.compressionJob?.await()
logger.debug(
"Compression job completed successfully for {}",
name,
)
}
} catch (e: Exception) {
// Cancel compression job if upload fails
compressionJob?.cancel()
preparedStream.compressionJob?.cancel()
throw e
}
}
Expand Down Expand Up @@ -262,6 +266,16 @@ abstract class ContainerUploaderBase(
val compressionJob: kotlinx.coroutines.Deferred<Long>,
)

/**
* Helper class to hold prepared upload stream with its compression type
* and optional compression job.
*/
private data class PreparedUploadStream(
val stream: InputStream,
val compressionType: CompressionType,
val compressionJob: kotlinx.coroutines.Deferred<Long>?,
)

/**
* Uploads a stream with retry logic and container cycling. Uses an
* incrementing counter (mod container count) for round-robin container
Expand Down
Loading
Loading