diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index e673ea11..c7536f76 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -276,8 +276,9 @@ abstract class ContainerUploaderBase( effectiveCompressionType: CompressionType = local.compressionType, ): BlobSource { // Select container using incrementing counter for round-robin distribution + // Note: Math.floorMod handles negative values correctly if overflow occurs var containerIndex = - containerIndexCounter.getAndIncrement() % containers.size + Math.floorMod(containerIndexCounter.getAndIncrement(), containers.size) logger.debug( "Starting upload with {} containers, round-robin index: {}", @@ -389,9 +390,9 @@ abstract class ContainerUploaderBase( ) // TODO check and validate failure scenarios // Use semaphore for true streaming parallelism - // This allows up to maxConcurrency concurrent uploads, starting new ones as soon as slots + // This allows up to effectiveMaxConcurrency concurrent uploads, starting new ones as soon as slots // are available - val semaphore = Semaphore(maxConcurrency) + val semaphore = Semaphore(effectiveMaxConcurrency) // Launch all uploads concurrently, but semaphore limits actual concurrent execution val results = diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt index f2bd6bc9..ea82e90f 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt @@ -45,13 +45,6 @@ interface ICustomUploader : Closeable { ): CompletableFuture } -/** - * Extension function to convert [ICustomUploader] to [IUploader]. - * - * Kotlin users can use this as: `myCustomUploader.asUploader()` - */ -fun ICustomUploader.asUploader(): IUploader = CustomUploaderAdapter(this) - /** * Static helper methods for [ICustomUploader]. * diff --git a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java index 8dd33075..1d4f81c0 100644 --- a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java +++ b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java @@ -1,9 +1,8 @@ package com.microsoft.azure.kusto.quickstart; import com.azure.core.tracing.opentelemetry.OpenTelemetryTracer; +import com.azure.core.credential.TokenCredential; import com.azure.identity.AzureCliCredentialBuilder; -import com.azure.identity.ChainedTokenCredential; -import com.azure.identity.ChainedTokenCredentialBuilder; import com.azure.identity.ClientSecretCredentialBuilder; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.ObjectMapper; @@ -735,7 +734,7 @@ private static void runIngestV2Sample(@NotNull ConfigJson config) { } System.out.println("Running ingest-v2 quickstart sample..."); - ChainedTokenCredential credential = buildIngestV2Credential(ingestV2Config); + TokenCredential credential = buildIngestV2Credential(ingestV2Config); try (QueuedIngestClient queuedIngestClient = QueuedIngestClientBuilder.create(clusterPath) .withAuthentication(credential) @@ -757,25 +756,23 @@ private static void runIngestV2Sample(@NotNull ConfigJson config) { } } - private static ChainedTokenCredential buildIngestV2Credential(@NotNull IngestV2QuickstartConfig config) { + private static TokenCredential buildIngestV2Credential(@NotNull IngestV2QuickstartConfig config) { AuthenticationModeOptions mode = config.getAuthModeOverride(); if (mode == null) { mode = AuthenticationModeOptions.USER_PROMPT; } - ChainedTokenCredentialBuilder builder = new ChainedTokenCredentialBuilder(); if (mode == AuthenticationModeOptions.APP_KEY) { if (StringUtils.isBlank(config.getAppId()) || StringUtils.isBlank(config.getAppKey()) || StringUtils.isBlank(config.getTenantId())) { Utils.errorHandler("AppKey authentication requires 'APP_ID', 'APP_KEY', and 'APP_TENANT' environment variables or ingestV2 overrides."); } - builder.addFirst(new ClientSecretCredentialBuilder() + return new ClientSecretCredentialBuilder() .clientId(config.getAppId()) .clientSecret(config.getAppKey()) .tenantId(config.getTenantId()) - .build()); + .build(); } else { - builder.addFirst(new AzureCliCredentialBuilder().build()); + return new AzureCliCredentialBuilder().build(); } - return builder.build(); } private static @NotNull List> ingestV2FromStreams(ConfigJson config, IngestV2QuickstartConfig ingestV2Config, @@ -835,7 +832,7 @@ private static ChainedTokenCredential buildIngestV2Credential(@NotNull IngestV2Q @NotNull QueuedIngestClient queuedIngestClient) { System.out.println("\n=== Queued batch ingestion: Upload local files to blob, then ingest (ingest-v2) ==="); String clusterPath = ingestV2Config.getClusterPath(); - ChainedTokenCredential credential = buildIngestV2Credential(ingestV2Config); + TokenCredential credential = buildIngestV2Credential(ingestV2Config); ConfigurationCache configCache = DefaultConfigurationCache.create( clusterPath,