diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestClientBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestClientBase.kt new file mode 100644 index 00000000..1014f8a1 --- /dev/null +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestClientBase.kt @@ -0,0 +1,139 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2 + +import java.net.URI + +/** + * Base utility object containing common functionality for ingest clients. + * + * This implementation matches the Java IngestClientBase behavior for handling + * special URLs like localhost, IP addresses, and reserved hostnames. + */ +object IngestClientBase { + private const val INGEST_PREFIX = "ingest-" + private const val PROTOCOL_SUFFIX = "://" + + /** + * Converts a cluster URL to an ingestion endpoint URL by adding the + * "ingest-" prefix. + * + * Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net) are + * returned unchanged to support local development and testing scenarios. + * + * @param clusterUrl The cluster URL to convert + * @return The ingestion endpoint URL with "ingest-" prefix, or the original + * URL for special cases + */ + @JvmStatic + fun getIngestionEndpoint(clusterUrl: String?): String? { + if (clusterUrl == null || + clusterUrl.contains(INGEST_PREFIX) || + isReservedHostname(clusterUrl) + ) { + return clusterUrl + } + return if (clusterUrl.contains(PROTOCOL_SUFFIX)) { + clusterUrl.replaceFirst( + PROTOCOL_SUFFIX, + "$PROTOCOL_SUFFIX$INGEST_PREFIX", + ) + } else { + INGEST_PREFIX + clusterUrl + } + } + + /** + * Converts an ingestion endpoint URL to a query endpoint URL by removing + * the "ingest-" prefix. + * + * Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net) are + * returned unchanged. + * + * @param clusterUrl The ingestion endpoint URL to convert + * @return The query endpoint URL without "ingest-" prefix, or the original + * URL for special cases + */ + @JvmStatic + fun getQueryEndpoint(clusterUrl: String?): String? { + return if (clusterUrl == null || isReservedHostname(clusterUrl)) { + clusterUrl + } else { + clusterUrl.replaceFirst(INGEST_PREFIX, "") + } + } + + /** + * Checks if the given URL points to a reserved hostname that should not + * have the "ingest-" prefix added. + * + * Reserved hostnames include: + * - localhost + * - IPv4 addresses (e.g., 127.0.0.1, 192.168.1.1) + * - IPv6 addresses (e.g., [2345:0425:2CA1:0000:0000:0567:5673:23b5]) + * - onebox.dev.kusto.windows.net (development environment) + * - Non-absolute URIs + * + * @param rawUri The URL to check + * @return true if the URL is a reserved hostname, false otherwise + */ + @JvmStatic + fun isReservedHostname(rawUri: String): Boolean { + val uri = + try { + URI.create(rawUri) + } catch (_: IllegalArgumentException) { + return true + } + + if (!uri.isAbsolute) { + return true + } + + val authority = uri.authority?.lowercase() ?: return true + + // Check for IPv6 address (wrapped in brackets) + val isIpAddress = + if (authority.startsWith("[") && authority.endsWith("]")) { + true + } else { + isIPv4Address(authority) + } + + val isLocalhost = authority.contains("localhost") + val host = uri.host?.lowercase() ?: "" + + return isLocalhost || + isIpAddress || + host.equals("onebox.dev.kusto.windows.net", ignoreCase = true) + } + + /** + * Checks if the given string is a valid IPv4 address. + * + * This method validates that the string consists of exactly 4 octets, each + * being a number between 0 and 255. + * + * @param address The string to check (may include port like "127.0.0.1:8080") + * @return true if the string is a valid IPv4 address, false otherwise + */ + private fun isIPv4Address(address: String): Boolean { + // Remove port if present (e.g., "127.0.0.1:8080" -> "127.0.0.1") + val hostPart = + if (address.contains(":")) { + address.substringBeforeLast(":") + } else { + address + } + + val parts = hostPart.split(".") + if (parts.size != 4) { + return false + } + + return parts.all { part -> + val num = part.toIntOrNull() + num != null && num in 0..255 + } + } +} \ No newline at end of file diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt index 2ad0546a..71d588f8 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/KustoBaseApiClient.kt @@ -54,7 +54,7 @@ open class KustoBaseApiClient( } val engineUrl: String - get() = dmUrl.replace(Regex("https://ingest-"), "https://") + get() = IngestClientBase.getQueryEndpoint(dmUrl) ?: dmUrl val api: DefaultApi by lazy { DefaultApi(baseUrl = dmUrl, httpClientConfig = setupConfig) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt index e086c6f4..a03bdc61 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/BaseIngestClientBuilder.kt @@ -3,6 +3,7 @@ package com.microsoft.azure.kusto.ingest.v2.builders import com.azure.core.credential.TokenCredential +import com.microsoft.azure.kusto.ingest.v2.IngestClientBase import com.microsoft.azure.kusto.ingest.v2.KustoBaseApiClient import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_CONCURRENCY import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES @@ -21,11 +22,6 @@ abstract class BaseIngestClientBuilder> { protected var s2sTokenProvider: (suspend () -> S2SToken)? = null protected var s2sFabricPrivateLinkAccessContext: String? = null - // Added properties for ingestion endpoint and authentication - protected var ingestionEndpoint: String? = null - protected var clusterEndpoint: String? = null - protected var authentication: TokenCredential? = null - protected var maxConcurrency: Int = UPLOAD_CONTAINER_MAX_CONCURRENCY protected var maxDataSize: Long = UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES protected var ignoreFileSize: Boolean = false @@ -37,7 +33,6 @@ abstract class BaseIngestClientBuilder> { fun withAuthentication(credential: TokenCredential): T { this.tokenCredential = credential - this.authentication = credential // Set authentication return self() } @@ -169,39 +164,34 @@ abstract class BaseIngestClientBuilder> { .build() } - protected fun setEndpoint(endpoint: String) { - this.ingestionEndpoint = normalizeAndCheckDmUrl(endpoint) - this.clusterEndpoint = normalizeAndCheckEngineUrl(endpoint) - } - companion object { + /** + * Converts an ingestion endpoint URL to a query/engine endpoint URL by + * removing the "ingest-" prefix. + * + * Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net) + * are returned unchanged. + * + * @param clusterUrl The ingestion endpoint URL to convert + * @return The query endpoint URL without "ingest-" prefix + */ protected fun normalizeAndCheckEngineUrl(clusterUrl: String): String { - val normalizedUrl = - if (clusterUrl.matches(Regex("https://ingest-[^/]+.*"))) { - // If the URL starts with https://ingest-, remove ingest- - clusterUrl.replace( - Regex("https://ingest-([^/]+)"), - "https://$1", - ) - } else { - clusterUrl - } - return normalizedUrl + return IngestClientBase.getQueryEndpoint(clusterUrl) ?: clusterUrl } + /** + * Converts a cluster URL to an ingestion endpoint URL by adding the + * "ingest-" prefix. + * + * Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net) + * are returned unchanged to support local development and testing. + * + * @param dmUrl The cluster URL to convert + * @return The ingestion endpoint URL with "ingest-" prefix + */ @JvmStatic protected fun normalizeAndCheckDmUrl(dmUrl: String): String { - val normalizedUrl = - if (dmUrl.matches(Regex("https://(?!ingest-)[^/]+.*"))) { - // If the URL starts with https:// and does not already have ingest-, add it - dmUrl.replace( - Regex("https://([^/]+)"), - "https://ingest-$1", - ) - } else { - dmUrl - } - return normalizedUrl + return IngestClientBase.getIngestionEndpoint(dmUrl) ?: dmUrl } } } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt index 31395190..927b47cb 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/QueuedIngestClientBuilder.kt @@ -58,7 +58,6 @@ class QueuedIngestClientBuilder private constructor(private val dmUrl: String) : } fun build(): QueuedIngestClient { - setEndpoint(dmUrl) requireNotNull(tokenCredential) { "Authentication is required. Call withAuthentication() before build()" } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/StreamingIngestClientBuilder.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/StreamingIngestClientBuilder.kt index 02bfe646..2d2ab45d 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/StreamingIngestClientBuilder.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/builders/StreamingIngestClientBuilder.kt @@ -23,11 +23,9 @@ private constructor(private val clusterUrl: String) : } fun build(): StreamingIngestClient { - setEndpoint(clusterUrl) requireNotNull(tokenCredential) { "Authentication is required. Call withAuthentication() before build()" } - validateParameters() val effectiveClientDetails = clientDetails ?: ClientDetails.createDefault() val apiClient = @@ -39,11 +37,6 @@ private constructor(private val clusterUrl: String) : ) return StreamingIngestClient( apiClient = apiClient, - ) // Assuming these are set in BaseIngestClientBuilder - } - - private fun validateParameters() { - requireNotNull(ingestionEndpoint) { "Ingestion endpoint must be set." } - requireNotNull(authentication) { "Authentication must be set." } + ) } } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt index 5f0535de..53026a97 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt @@ -9,6 +9,7 @@ import com.microsoft.azure.kusto.ingest.v2.models.StatusResponse import com.microsoft.azure.kusto.ingest.v2.source.BlobSource import com.microsoft.azure.kusto.ingest.v2.source.IngestionSource import java.io.Closeable +import java.util.concurrent.CompletableFuture /** * Interface for ingesting data into Kusto. @@ -74,6 +75,70 @@ interface IngestClient : Closeable { suspend fun getOperationDetailsAsync( operation: IngestionOperation, ): StatusResponse + + // ========================================================================= + // Java-friendly methods returning CompletableFuture + // These have the "Java" suffix to avoid JVM signature conflicts with + // suspend functions. + // ========================================================================= + + /** + * Ingests data from the specified source. This is the Java-friendly version + * that returns a [CompletableFuture]. + * + * @param database The target database name. + * @param table The target table name. + * @param source The source to ingest. + * @param ingestRequestProperties Ingestion properties containing format, + * mapping, and other settings. + * @return A [CompletableFuture] that completes with an + * [ExtendedIngestResponse] containing the operation ID and ingestion + * kind. + */ + fun ingestAsyncJava( + database: String, + table: String, + source: IngestionSource, + ingestRequestProperties: IngestRequestProperties?, + ): CompletableFuture + + /** + * Get the current status of an ingestion operation. This is the + * Java-friendly version that returns a [CompletableFuture]. + * + * Unlike [getOperationDetailsAsync], this method returns only the summary + * of the operation - statistics on the blobs ingested, and the operation + * status. + * + * To use this method, the [IngestRequestProperties.enableTracking] property + * must be set to true when ingesting the data. + * + * @param operation The ingestion operation to get the status for. + * @return A [CompletableFuture] that completes with a [Status] object + * providing a summary of the ingestion operation. + */ + fun getOperationSummaryAsyncJava( + operation: IngestionOperation, + ): CompletableFuture + + /** + * Get the current status of an ingestion operation. This is the + * Java-friendly version that returns a [CompletableFuture]. + * + * This method returns detailed information about the operation - statistics + * on the blobs ingested, and the operation status, as well as specific + * results for each blob. + * + * To use this method, the [IngestRequestProperties.enableTracking] property + * must be set to true when ingesting the data. + * + * @param operation The ingestion operation to get the status for. + * @return A [CompletableFuture] that completes with a [StatusResponse] + * object providing detailed information about the ingestion operation. + */ + fun getOperationDetailsAsyncJava( + operation: IngestionOperation, + ): CompletableFuture } /** Interface for ingesting from multiple data sources into Kusto. */ @@ -121,4 +186,57 @@ interface MultiIngestClient : IngestClient { * request. */ suspend fun getMaxSourcesPerMultiIngest(): Int + + // ========================================================================= + // Java-friendly methods returning CompletableFuture + // These have the "Java" suffix to avoid JVM signature conflicts with + // suspend functions. + // ========================================================================= + + /** + * Ingest data from multiple blob sources. This is the Java-friendly version + * that returns a [CompletableFuture]. + * + * **Important:** Multi-blob ingestion only supports [BlobSource]. This + * design avoids partial failure scenarios where some local sources might be + * uploaded successfully while others fail, leaving the user in an + * inconsistent state. + * + * **For local files/streams**, you have two options: + * 1. **Single-source ingestion**: Use `ingestAsync(source, properties)` + * with a single [com.microsoft.azure.kusto.ingest.v2.source.LocalSource] + * (FileSource or StreamSource). The client handles upload internally. + * 2. **Multi-source ingestion**: Use + * [com.microsoft.azure.kusto.ingest.v2.uploader.IUploader] to upload + * local sources to blob storage first, then call this method with the + * resulting [BlobSource] objects. + * + * @param database The target database name. + * @param table The target table name. + * @param sources The blob sources to ingest. All sources must be + * [BlobSource] instances. + * @param ingestRequestProperties Ingestion properties containing format, + * mapping, and other settings. + * @return A [CompletableFuture] that completes with an + * [ExtendedIngestResponse] containing the ingestion operation details. + */ + fun ingestAsyncJava( + database: String, + table: String, + sources: List, + ingestRequestProperties: IngestRequestProperties?, + ): CompletableFuture + + /** + * Returns the maximum number of sources that can be ingested in a single + * call to [ingestAsync]. This is the Java-friendly version that returns a + * [CompletableFuture]. + * + * This limit is imposed to avoid excessively large requests that could lead + * to performance degradation or failures. + * + * @return A [CompletableFuture] that completes with the maximum number of + * sources allowed in a single ingestion request. + */ + fun getMaxSourcesPerMultiIngestJava(): CompletableFuture } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/ManagedStreamingIngestClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/ManagedStreamingIngestClient.kt index f63fc534..42da40a7 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/ManagedStreamingIngestClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/ManagedStreamingIngestClient.kt @@ -178,15 +178,19 @@ internal constructor( * @return A [CompletableFuture] that completes with an * [ExtendedIngestResponse]. */ - @JvmName("ingestAsync") - fun ingestAsyncJava( + override fun ingestAsyncJava( database: String, table: String, source: IngestionSource, ingestRequestProperties: IngestRequestProperties?, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { - ingestAsync(database, table, source, ingestRequestProperties) + ingestAsync( + database, + table, + source, + ingestRequestProperties, + ) } /** @@ -196,8 +200,7 @@ internal constructor( * @param operation The ingestion operation to get the status for. * @return A [CompletableFuture] that completes with a [Status] object. */ - @JvmName("getOperationSummaryAsync") - fun getOperationSummaryAsyncJava( + override fun getOperationSummaryAsyncJava( operation: IngestionOperation, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { @@ -212,8 +215,7 @@ internal constructor( * @return A [CompletableFuture] that completes with a [StatusResponse] * object. */ - @JvmName("getOperationDetailsAsync") - fun getOperationDetailsAsyncJava( + override fun getOperationDetailsAsyncJava( operation: IngestionOperation, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt index e4aaf090..e3a7e469 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt @@ -116,8 +116,7 @@ internal constructor( * already exist in blob storage, so no upload is performed - the request is * sent directly to the Data Management service. */ - @JvmName("ingestAsync") - fun ingestAsyncJava( + override fun ingestAsyncJava( database: String, table: String, sources: List, @@ -136,8 +135,7 @@ internal constructor( * Ingests data from a single source with the given properties. This is the * Java-friendly version that returns a CompletableFuture. */ - @JvmName("ingestAsync") - fun ingestAsyncJava( + override fun ingestAsyncJava( database: String, table: String, source: IngestionSource, @@ -156,8 +154,7 @@ internal constructor( * Gets the operation summary for the specified ingestion operation. This is * the Java-friendly version that returns a CompletableFuture. */ - @JvmName("getOperationSummaryAsync") - fun getOperationSummaryAsyncJava( + override fun getOperationSummaryAsyncJava( operation: IngestionOperation, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { @@ -168,14 +165,23 @@ internal constructor( * Gets the detailed operation status for the specified ingestion operation. * This is the Java-friendly version that returns a CompletableFuture. */ - @JvmName("getOperationDetailsAsync") - fun getOperationDetailsAsyncJava( + override fun getOperationDetailsAsyncJava( operation: IngestionOperation, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { getOperationDetailsAsync(operation) } + /** + * Returns the maximum number of sources that can be ingested in a single + * call to [ingestAsync]. This is the Java-friendly version that returns a + * CompletableFuture. + */ + override fun getMaxSourcesPerMultiIngestJava(): CompletableFuture = + CoroutineScope(Dispatchers.IO).future { + getMaxSourcesPerMultiIngest() + } + /** * Polls the ingestion status until completion or timeout. This is the * Java-friendly version that returns a CompletableFuture. diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/StreamingIngestClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/StreamingIngestClient.kt index 7883cb41..9d9a41e4 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/StreamingIngestClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/StreamingIngestClient.kt @@ -126,8 +126,7 @@ internal constructor(private val apiClient: KustoBaseApiClient) : IngestClient { * @return A CompletableFuture that completes with an * ExtendedIngestResponse. */ - @JvmName("ingestAsync") - fun ingestAsyncJava( + override fun ingestAsyncJava( database: String, table: String, source: IngestionSource, @@ -152,8 +151,7 @@ internal constructor(private val apiClient: KustoBaseApiClient) : IngestClient { * @param operation The ingestion operation to get the status for. * @return A CompletableFuture that completes with a Status object. */ - @JvmName("getOperationSummaryAsync") - fun getOperationSummaryAsyncJava( + override fun getOperationSummaryAsyncJava( operation: IngestionOperation, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { @@ -170,8 +168,7 @@ internal constructor(private val apiClient: KustoBaseApiClient) : IngestClient { * @param operation The ingestion operation to get the details for. * @return A CompletableFuture that completes with a StatusResponse object. */ - @JvmName("getOperationDetailsAsync") - fun getOperationDetailsAsyncJava( + override fun getOperationDetailsAsyncJava( operation: IngestionOperation, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt index d581edc2..761a1f1a 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt @@ -9,11 +9,61 @@ import com.microsoft.azure.kusto.ingest.v2.ConfigurationClient import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse +import com.microsoft.azure.kusto.ingest.v2.uploader.ExtendedContainerInfo +import com.microsoft.azure.kusto.ingest.v2.uploader.RoundRobinContainerList +import com.microsoft.azure.kusto.ingest.v2.uploader.UploadMethod import java.lang.AutoCloseable import java.time.Duration import java.util.concurrent.atomic.AtomicReference import kotlin.math.min +/** + * Wrapper around ConfigurationResponse that includes shared RoundRobinContainerList + * instances for even distribution of uploads across containers. + * + * This class holds the configuration response along with pre-created container lists + * that maintain their own atomic counters. All uploaders sharing the same cache + * will use the same RoundRobinContainerList instances, ensuring proper load + * distribution. + */ +class CachedConfigurationData( + val response: ConfigurationResponse, +) { + /** + * Lazily initialized RoundRobinContainerList for storage containers. + * The list is created once and reused for all requests until the cache refreshes. + */ + val storageContainerList: RoundRobinContainerList by lazy { + val containers = response.containerSettings?.containers + if (containers.isNullOrEmpty()) { + RoundRobinContainerList.empty() + } else { + RoundRobinContainerList.of( + containers.map { ExtendedContainerInfo(it, UploadMethod.STORAGE) } + ) + } + } + + /** + * Lazily initialized RoundRobinContainerList for lake containers. + * The list is created once and reused for all requests until the cache refreshes. + */ + val lakeContainerList: RoundRobinContainerList by lazy { + val lakeFolders = response.containerSettings?.lakeFolders + if (lakeFolders.isNullOrEmpty()) { + RoundRobinContainerList.empty() + } else { + RoundRobinContainerList.of( + lakeFolders.map { ExtendedContainerInfo(it, UploadMethod.LAKE) } + ) + } + } + + // Delegate all ConfigurationResponse properties + val containerSettings get() = response.containerSettings + val ingestionSettings get() = response.ingestionSettings +} + /** * Interface for caching configuration data. * @@ -29,8 +79,12 @@ interface ConfigurationCache : AutoCloseable { * Gets the current configuration, refreshing it if necessary based on the * refresh interval. This method may return cached data if the cache is * still valid. + * + * The returned CachedConfigurationData includes shared RoundRobinContainerList + * instances that provide even distribution of uploads across containers for + * all uploaders sharing this cache. */ - suspend fun getConfiguration(): ConfigurationResponse + suspend fun getConfiguration(): CachedConfigurationData override fun close() } @@ -160,9 +214,12 @@ class DefaultConfigurationCache( * interval atomically. This prevents race conditions between checking * expiration and updating, and ensures we use the correct refresh interval * from when the config was fetched. + * + * The CachedConfigurationData wrapper includes pre-created RoundRobinContainerList + * instances that are shared by all uploaders using this cache. */ private data class CachedData( - val configuration: ConfigurationResponse, + val configuration: CachedConfigurationData, val timestamp: Long, val refreshInterval: Long, ) @@ -225,7 +282,7 @@ class DefaultConfigurationCache( * default refresh interval. */ private fun calculateEffectiveRefreshInterval( - config: ConfigurationResponse?, + config: CachedConfigurationData?, ): Long { val configRefreshInterval = config?.containerSettings?.refreshInterval return if (configRefreshInterval?.isNotEmpty() == true) { @@ -241,7 +298,7 @@ class DefaultConfigurationCache( } } - override suspend fun getConfiguration(): ConfigurationResponse { + override suspend fun getConfiguration(): CachedConfigurationData { val currentTime = System.currentTimeMillis() val cachedData = cache.get() @@ -254,13 +311,17 @@ class DefaultConfigurationCache( if (needsRefresh) { // Attempt to refresh - only one thread will succeed - val newConfig = + val newConfigResponse = runCatching { provider() } .getOrElse { // If fetch fails, return cached if available, otherwise rethrow - cachedData?.configuration ?: throw it + cachedData?.configuration?.response ?: throw it } + // Wrap the response in CachedConfigurationData to create shared + // RoundRobinContainerList instances + val newConfig = CachedConfigurationData(newConfigResponse) + // Calculate effective refresh interval from the NEW configuration val newEffectiveRefreshInterval = calculateEffectiveRefreshInterval(newConfig) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index 71dc91af..71b7a806 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -46,7 +46,6 @@ import java.time.Clock import java.time.Duration import java.time.Instant import java.util.concurrent.CompletableFuture -import java.util.concurrent.atomic.AtomicInteger import java.util.zip.GZIPOutputStream /** Represents an abstract base class for uploaders to storage containers. */ @@ -65,12 +64,6 @@ abstract class ContainerUploaderBase( private val effectiveMaxConcurrency: Int = minOf(maxConcurrency, Runtime.getRuntime().availableProcessors()) - /** - * Atomic counter for round-robin container selection. Increments on each - * upload to distribute load evenly across containers. - */ - private val containerIndexCounter = AtomicInteger(0) - override var ignoreSizeLimit: Boolean = false override fun close() { @@ -277,25 +270,25 @@ abstract class ContainerUploaderBase( ) /** - * Uploads a stream with retry logic and container cycling. Uses an - * incrementing counter (mod container count) for round-robin container - * selection, ensuring even load distribution across containers on each - * retry. For example, with 2 containers and 3 retries: 0->1->0 or 1->0->1 + * Uploads a stream with retry logic and container cycling. Uses the + * shared counter from the RoundRobinContainerList for round-robin container + * selection, ensuring even load distribution across containers across all + * uploaders sharing the same ConfigurationCache. + * For example, with 2 containers and uploaders A and B: + * - A's 1st upload uses container 0 + * - B's 1st upload uses container 1 + * - A's 2nd upload uses container 0 (cycles back) */ private suspend fun uploadWithRetries( local: LocalSource, name: String, stream: InputStream, - containers: List, + containers: RoundRobinContainerList, effectiveCompressionType: CompressionType = local.compressionType, ): BlobSource { - // Select container using incrementing counter for round-robin distribution - // Note: Math.floorMod handles negative values correctly if overflow occurs - var containerIndex = - Math.floorMod( - containerIndexCounter.getAndIncrement(), - containers.size, - ) + // Select container using the shared counter from RoundRobinContainerList + // This ensures even distribution across all uploaders sharing the same cache + var containerIndex = containers.getNextStartIndex() logger.debug( "Starting upload with {} containers, round-robin index: {}", @@ -776,11 +769,15 @@ abstract class ContainerUploaderBase( * Selects the appropriate containers for upload based on the uploader's * configuration cache and the specified upload method. * + * The returned RoundRobinContainerList contains a shared counter that + * enables even distribution of uploads across containers, even when + * multiple uploaders share the same ConfigurationCache. + * * @param uploadMethod The upload method to consider when selecting * containers. - * @return A list of selected container information. + * @return A RoundRobinContainerList with shared counter for load distribution. */ abstract suspend fun selectContainers( uploadMethod: UploadMethod, - ): List + ): RoundRobinContainerList } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt index df38b52e..1752a5f8 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt @@ -41,12 +41,13 @@ internal constructor( override suspend fun selectContainers( uploadMethod: UploadMethod, - ): List { + ): RoundRobinContainerList { // This method is delegated to and this calls getConfiguration again to ensure fresh data is // retrieved // or cached data is used as appropriate. + val configuration = configurationCache.getConfiguration() val containerSettings = - configurationCache.getConfiguration().containerSettings + configuration.containerSettings ?: throw IngestException( "No container settings available", isPermanent = true, @@ -112,23 +113,19 @@ internal constructor( UploadMethod.LAKE } } + + // Get the appropriate RoundRobinContainerList from the configuration cache + // The cache maintains shared counters for each container type to ensure + // even distribution across all uploaders sharing the same cache return when { effectiveMethod == UploadMethod.LAKE && hasLake -> - containerSettings.lakeFolders.map { - ExtendedContainerInfo(it, UploadMethod.LAKE) - } + configuration.lakeContainerList effectiveMethod == UploadMethod.STORAGE && hasStorage -> - containerSettings.containers.map { - ExtendedContainerInfo(it, UploadMethod.STORAGE) - } + configuration.storageContainerList hasStorage -> - containerSettings.containers.map { - ExtendedContainerInfo(it, UploadMethod.STORAGE) - } + configuration.storageContainerList else -> - containerSettings.lakeFolders!!.map { - ExtendedContainerInfo(it, UploadMethod.LAKE) - } + configuration.lakeContainerList } } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/RoundRobinContainerList.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/RoundRobinContainerList.kt new file mode 100644 index 00000000..97cb6166 --- /dev/null +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/RoundRobinContainerList.kt @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.uploader + +import java.util.concurrent.atomic.AtomicLong + +/** + * A thread-safe wrapper around a list of containers that provides round-robin + * selection across all clients sharing the same list instance. + * + * The counter is stored within this class, so all uploaders sharing the same + * ConfigurationCache (and thus the same RoundRobinContainerList instances) + * will distribute their uploads evenly across containers. + * + * For example, if uploader A and uploader B share the same cache: + * - Uploader A's first upload goes to container 0 + * - Uploader B's first upload goes to container 1 + * - Uploader A's second upload goes to container 2 + * - etc. + * + * This is similar to C# implementation's MonitoredContainerCyclicEnumerator. + * + * @param containers The list of containers to cycle through + */ +class RoundRobinContainerList( + private val containers: List, +) : List by containers { + + /** + * Atomic counter for round-robin container selection. + * Uses Long to handle high-frequency uploads without overflow concerns. + * Initialized to -1 so the first getAndIncrement returns 0. + */ + private val currentIndex = AtomicLong(-1) + + /** + * Gets the next starting index for round-robin container selection. + * Each call returns the next index in sequence, wrapping around. + * + * @return The next index to start from (0 to size-1) + */ + fun getNextStartIndex(): Int { + if (containers.isEmpty()) { + return 0 + } + // Use Math.floorMod to handle potential negative values from overflow + return Math.floorMod(currentIndex.incrementAndGet(), containers.size.toLong()).toInt() + } + + /** + * Creates a copy of the underlying container list. + * This is useful when the list needs to be passed to APIs + * that don't support RoundRobinContainerList. + */ + fun toList(): List = containers.toList() + + companion object { + /** + * Creates an empty RoundRobinContainerList. + */ + fun empty(): RoundRobinContainerList = RoundRobinContainerList(emptyList()) + + /** + * Creates a RoundRobinContainerList from a list of ExtendedContainerInfo. + */ + fun of(containers: List): RoundRobinContainerList = + RoundRobinContainerList(containers) + } +} \ No newline at end of file diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java index b4acdc24..ad536b6e 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java @@ -51,8 +51,9 @@ public IngestV2JavaTestBase(Class testClass) { if (this.dmEndpoint == null) { throw new IllegalArgumentException("DM_CONNECTION_STRING environment variable is not set"); } - - this.engineEndpoint = dmEndpoint.replace("https://ingest-", "https://"); + + String queryEndpoint = IngestClientBase.getQueryEndpoint(dmEndpoint); + this.engineEndpoint = queryEndpoint != null ? queryEndpoint : dmEndpoint; // Define table schema diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/ManagedStreamingIngestClientJavaTest.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/ManagedStreamingIngestClientJavaTest.java index c77d0bc3..c7520da4 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/ManagedStreamingIngestClientJavaTest.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/ManagedStreamingIngestClientJavaTest.java @@ -83,7 +83,7 @@ public void testManagedStreamingIngestSmallData(boolean useIngestRequestProperti // Ingest data (should use streaming for small data) logger.info("Ingesting small data via managed streaming..."); - ExtendedIngestResponse response = client.ingestAsync(database,targetTable,source, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database,targetTable,source, properties).get(); assertNotNull(response, "Response should not be null"); if (useIngestRequestProperties) { @@ -157,7 +157,7 @@ public void testManagedStreamingIngestWithFallback(boolean useIngestRequestPrope .build() : null; logger.info("Ingesting larger data via managed streaming (may trigger fallback)..."); - ExtendedIngestResponse response = client.ingestAsync(database,targetTable,source, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database,targetTable,source, properties).get(); assertNotNull(response, "Response should not be null"); @@ -230,7 +230,7 @@ public void testManagedStreamingIngestFromFileSource(boolean useIngestRequestPro .build() : null; logger.info("Ingesting file via managed streaming..."); - ExtendedIngestResponse response = client.ingestAsync(database,targetTable,fileSource, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database,targetTable,fileSource, properties).get(); assertNotNull(response, "Response should not be null"); if (useIngestRequestProperties) { diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientJavaTest.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientJavaTest.java index 9f0de901..23077602 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientJavaTest.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientJavaTest.java @@ -84,7 +84,7 @@ public void testBasicQueuedIngest(boolean useIngestRequestProperties) throws Exc // Queue data for ingestion logger.info("Queueing data for ingestion..."); - ExtendedIngestResponse response = client.ingestAsync(database, targetTable,source, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database, targetTable,source, properties).get(); assertNotNull(response, "Response should not be null"); if(useIngestRequestProperties) { @@ -104,7 +104,7 @@ public void testBasicQueuedIngest(boolean useIngestRequestProperties) throws Exc ); // Get initial status - StatusResponse initialStatus = client.getOperationDetailsAsync(operation).get(); + StatusResponse initialStatus = client.getOperationDetailsAsyncJava(operation).get(); assertNotNull(initialStatus, "Initial status should not be null"); logger.info("Initial status retrieved"); @@ -166,7 +166,7 @@ public void testQueuedIngestFromFileSource() throws Exception { .build(); logger.info("Queueing file for ingestion..."); - ExtendedIngestResponse response = client.ingestAsync(database, targetTable,fileSource, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database, targetTable,fileSource, properties).get(); assertNotNull(response, "Response should not be null"); logger.info("File queued. Operation ID: {}", diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/StreamingIngestClientJavaTest.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/StreamingIngestClientJavaTest.java index 16dd6f12..a4ec34cc 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/StreamingIngestClientJavaTest.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/StreamingIngestClientJavaTest.java @@ -80,7 +80,7 @@ public void testBasicStreamingIngest(boolean useIngestRequestProperties) throws // Ingest data logger.info("Ingesting data via streaming..."); - ExtendedIngestResponse response = client.ingestAsync(database, targetTable,source, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database, targetTable,source, properties).get(); assertNotNull(response, "Response should not be null"); if (useIngestRequestProperties) { @@ -139,7 +139,7 @@ public void testStreamingIngestWithCompression(boolean useIngestRequestPropertie .build() : null; logger.info("Ingesting compressed data..."); - ExtendedIngestResponse response = client.ingestAsync(database, targetTable,source, properties).get(); + ExtendedIngestResponse response = client.ingestAsyncJava(database, targetTable,source, properties).get(); assertNotNull(response, "Response should not be null"); if (useIngestRequestProperties) { diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClientTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClientTest.kt index 9006e7a1..bcdc95d8 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClientTest.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/ConfigurationClientTest.kt @@ -100,15 +100,16 @@ class ConfigurationClientTest : ) val config = defaultCachedConfig.getConfiguration() assertNotNull(config, "Configuration should not be null") + val containerSettings = config.containerSettings assertNotNull( - config.containerSettings, + containerSettings, "ContainerSettings should not be null", ) assertNotNull( - config.containerSettings.preferredUploadMethod, + containerSettings!!.preferredUploadMethod, "Preferred upload should not be null", ) - config.containerSettings.containers?.forEach { containerInfo -> + containerSettings.containers?.forEach { containerInfo -> run { assertNotNull( containerInfo.path, diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestClientBaseTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestClientBaseTest.kt new file mode 100644 index 00000000..ac065d8b --- /dev/null +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestClientBaseTest.kt @@ -0,0 +1,219 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2 + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import java.util.stream.Stream + +/** + * Tests for [IngestClientBase] URL handling utilities. + * + * These tests match the Java IngestClientBaseTest to ensure parity. + */ +class IngestClientBaseTest { + + companion object { + @JvmStatic + fun provideStringsForGetIngestionEndpoint(): Stream = + Stream.of( + // Normal URLs should get ingest- prefix + Arguments.of( + "https://testendpoint.dev.kusto.windows.net", + "https://ingest-testendpoint.dev.kusto.windows.net", + ), + Arguments.of( + "https://shouldwork", + "https://ingest-shouldwork", + ), + // Non-IP hostnames that look like IPs should get prefix + Arguments.of( + "https://192.shouldwork.1.1", + "https://ingest-192.shouldwork.1.1", + ), + Arguments.of( + "https://2345:shouldwork:0425", + "https://ingest-2345:shouldwork:0425", + ), + Arguments.of( + "https://376.568.1564.1564", + "https://ingest-376.568.1564.1564", + ), + // Valid IPv4 addresses should NOT get prefix + Arguments.of( + "https://192.168.1.1", + "https://192.168.1.1", + ), + Arguments.of( + "https://127.0.0.1", + "https://127.0.0.1", + ), + // IPv6 addresses should NOT get prefix + Arguments.of( + "https://[2345:0425:2CA1:0000:0000:0567:5673:23b5]", + "https://[2345:0425:2CA1:0000:0000:0567:5673:23b5]", + ), + // Localhost should NOT get prefix + Arguments.of( + "https://localhost", + "https://localhost", + ), + // Onebox dev should NOT get prefix + Arguments.of( + "https://onebox.dev.kusto.windows.net", + "https://onebox.dev.kusto.windows.net", + ), + ) + + @JvmStatic + fun provideStringsForGetQueryEndpoint(): Stream = + Stream.of( + // Should remove ingest- prefix + Arguments.of( + "https://ingest-testendpoint.dev.kusto.windows.net", + "https://testendpoint.dev.kusto.windows.net", + ), + // No ingest- prefix should return unchanged + Arguments.of( + "https://testendpoint.dev.kusto.windows.net", + "https://testendpoint.dev.kusto.windows.net", + ), + // Reserved hostnames should return unchanged + Arguments.of( + "https://localhost", + "https://localhost", + ), + Arguments.of( + "https://127.0.0.1", + "https://127.0.0.1", + ), + Arguments.of( + "https://onebox.dev.kusto.windows.net", + "https://onebox.dev.kusto.windows.net", + ), + ) + } + + @ParameterizedTest + @MethodSource("provideStringsForGetIngestionEndpoint") + fun `getIngestionEndpoint should correctly transform URLs`( + input: String, + expected: String, + ) { + val actual = IngestClientBase.getIngestionEndpoint(input) + assertEquals(expected, actual) + } + + @ParameterizedTest + @MethodSource("provideStringsForGetQueryEndpoint") + fun `getQueryEndpoint should correctly transform URLs`( + input: String, + expected: String, + ) { + val actual = IngestClientBase.getQueryEndpoint(input) + assertEquals(expected, actual) + } + + @Test + fun `getIngestionEndpoint with null should return null`() { + assertNull(IngestClientBase.getIngestionEndpoint(null)) + } + + @Test + fun `getQueryEndpoint with null should return null`() { + assertNull(IngestClientBase.getQueryEndpoint(null)) + } + + @Test + fun `getIngestionEndpoint with existing ingest prefix should return unchanged`() { + val url = "https://ingest-test.kusto.windows.net" + assertEquals(url, IngestClientBase.getIngestionEndpoint(url)) + } + + @Test + fun `getIngestionEndpoint without protocol should return unchanged`() { + // URLs without protocol are treated as non-absolute URIs (reserved) + // and returned unchanged - matching Java IngestClientBase behavior + assertEquals( + "test.kusto.windows.net", + IngestClientBase.getIngestionEndpoint("test.kusto.windows.net"), + ) + } + + @Test + fun `isReservedHostname for localhost should return true`() { + assertTrue(IngestClientBase.isReservedHostname("https://localhost")) + assertTrue(IngestClientBase.isReservedHostname("https://localhost:8080")) + assertTrue( + IngestClientBase.isReservedHostname("https://localhost/path"), + ) + } + + @Test + fun `isReservedHostname for valid IPv4 should return true`() { + assertTrue(IngestClientBase.isReservedHostname("https://192.168.1.1")) + assertTrue(IngestClientBase.isReservedHostname("https://10.0.0.1")) + assertTrue(IngestClientBase.isReservedHostname("https://127.0.0.1")) + } + + @Test + fun `isReservedHostname for IPv6 should return true`() { + assertTrue( + IngestClientBase.isReservedHostname( + "https://[2345:0425:2CA1:0000:0000:0567:5673:23b5]", + ), + ) + assertTrue(IngestClientBase.isReservedHostname("https://[::1]")) + } + + @Test + fun `isReservedHostname for onebox dev should return true`() { + assertTrue( + IngestClientBase.isReservedHostname( + "https://onebox.dev.kusto.windows.net", + ), + ) + assertTrue( + IngestClientBase.isReservedHostname( + "https://ONEBOX.DEV.KUSTO.WINDOWS.NET", + ), + ) + } + + @Test + fun `isReservedHostname for normal URLs should return false`() { + assertFalse( + IngestClientBase.isReservedHostname( + "https://test.kusto.windows.net", + ), + ) + assertFalse( + IngestClientBase.isReservedHostname( + "https://ingest-test.kusto.windows.net", + ), + ) + } + + @Test + fun `isReservedHostname for invalid IPv4-like strings should return false`() { + // These look like IPs but aren't valid (out of range or wrong format) + assertFalse( + IngestClientBase.isReservedHostname("https://376.568.1564.1564"), + ) + assertFalse( + IngestClientBase.isReservedHostname("https://192.shouldwork.1.1"), + ) + } + + @Test + fun `isReservedHostname for non-absolute URI should return true`() { + assertTrue(IngestClientBase.isReservedHostname("not-a-valid-uri")) + assertTrue(IngestClientBase.isReservedHostname("test.kusto.windows.net")) + } +} \ No newline at end of file diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt index 25bf6dc5..a0e64ae9 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt @@ -32,7 +32,7 @@ abstract class IngestV2TestBase(testClass: Class<*>) { protected val oneLakeFolder: String? = System.getenv("ONE_LAKE_FOLDER") protected val targetTestFormat = Format.json protected val engineEndpoint: String = - dmEndpoint.replace("https://ingest-", "https://") + IngestClientBase.getQueryEndpoint(dmEndpoint) ?: dmEndpoint lateinit var targetTable: String protected val columnNamesToTypes: Map = mapOf( diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt index d201de9e..97a2cd08 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.microsoft.azure.kusto.ingest.v2.uploader +import com.microsoft.azure.kusto.ingest.v2.common.CachedConfigurationData import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse @@ -72,7 +73,7 @@ class ManagedUploaderTest { override val refreshInterval: Duration get() = Duration.ofHours(1) - override suspend fun getConfiguration(): ConfigurationResponse { + override suspend fun getConfiguration(): CachedConfigurationData { val resourcesDirectory = "src/test/resources/" val fileName = "config-response.json" val configContent = @@ -87,7 +88,7 @@ class ManagedUploaderTest { assertNotNull(configurationResponse) assertNotNull(configurationResponse.containerSettings) - return configurationResponse + return CachedConfigurationData(configurationResponse) } override fun close() { diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/RoundRobinContainerListTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/RoundRobinContainerListTest.kt new file mode 100644 index 00000000..63608368 --- /dev/null +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/RoundRobinContainerListTest.kt @@ -0,0 +1,229 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.uploader + +import com.microsoft.azure.kusto.ingest.v2.models.ContainerInfo +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode +import java.util.concurrent.ConcurrentHashMap + +/** + * Tests for RoundRobinContainerList to ensure proper container cycling behavior + * and thread-safety when multiple uploaders share the same list. + */ +@Execution(ExecutionMode.CONCURRENT) +class RoundRobinContainerListTest { + + private fun createContainers(count: Int): List = + (0 until count).map { i -> + ExtendedContainerInfo( + ContainerInfo("https://storage$i.blob.core.windows.net/container$i"), + UploadMethod.STORAGE + ) + } + + @Test + fun `empty list returns isEmpty true`() { + val list = RoundRobinContainerList.empty() + assertTrue(list.isEmpty()) + assertEquals(0, list.size) + } + + @Test + fun `non-empty list returns isEmpty false`() { + val containers = createContainers(3) + val list = RoundRobinContainerList.of(containers) + assertFalse(list.isEmpty()) + assertEquals(3, list.size) + } + + @Test + fun `getNextStartIndex cycles through all containers`() { + val containers = createContainers(3) + val list = RoundRobinContainerList.of(containers) + + // Should cycle through 0, 1, 2, 0, 1, 2, ... + assertEquals(0, list.getNextStartIndex()) + assertEquals(1, list.getNextStartIndex()) + assertEquals(2, list.getNextStartIndex()) + assertEquals(0, list.getNextStartIndex()) + assertEquals(1, list.getNextStartIndex()) + assertEquals(2, list.getNextStartIndex()) + } + + @Test + fun `index access returns correct container`() { + val containers = createContainers(3) + val list = RoundRobinContainerList.of(containers) + + assertEquals(containers[0], list[0]) + assertEquals(containers[1], list[1]) + assertEquals(containers[2], list[2]) + } + + @Test + fun `iterator returns all containers`() { + val containers = createContainers(3) + val list = RoundRobinContainerList.of(containers) + + val iterated = list.toList() + assertEquals(containers, iterated) + } + + @Test + fun `single container list always returns index 0`() { + val containers = createContainers(1) + val list = RoundRobinContainerList.of(containers) + + // With only 1 container, all calls should return 0 + assertEquals(0, list.getNextStartIndex()) + assertEquals(0, list.getNextStartIndex()) + assertEquals(0, list.getNextStartIndex()) + } + + @Test + fun `multiple uploaders sharing same list get different start indices`() { + // Simulate multiple uploaders (A, B, C) sharing the same list + val containers = createContainers(5) + val list = RoundRobinContainerList.of(containers) + + // Simulate sequential uploads from different uploaders + val uploaderAIndex1 = list.getNextStartIndex() // Uploader A's 1st upload + val uploaderBIndex1 = list.getNextStartIndex() // Uploader B's 1st upload + val uploaderCIndex1 = list.getNextStartIndex() // Uploader C's 1st upload + val uploaderAIndex2 = list.getNextStartIndex() // Uploader A's 2nd upload + val uploaderBIndex2 = list.getNextStartIndex() // Uploader B's 2nd upload + + // Each should get a different starting index (cycling) + assertEquals(0, uploaderAIndex1) + assertEquals(1, uploaderBIndex1) + assertEquals(2, uploaderCIndex1) + assertEquals(3, uploaderAIndex2) + assertEquals(4, uploaderBIndex2) + } + + @Test + fun `concurrent access produces even distribution`() = runBlocking { + val containers = createContainers(4) + val list = RoundRobinContainerList.of(containers) + + // Track which containers are selected + val containerSelectionCounts = ConcurrentHashMap() + (0..3).forEach { containerSelectionCounts[it] = 0 } + + // Simulate 1000 concurrent uploads + val numConcurrentUploads = 1000 + val jobs = (0 until numConcurrentUploads).map { + async(Dispatchers.Default) { + val index = list.getNextStartIndex() + containerSelectionCounts.compute(index) { _, count -> (count ?: 0) + 1 } + } + } + jobs.awaitAll() + + // Verify distribution is even (each container should have ~250 selections) + val expectedPerContainer = numConcurrentUploads / containers.size + val tolerance = expectedPerContainer * 0.1 // 10% tolerance for timing variations + + containerSelectionCounts.forEach { (index, count) -> + assertTrue( + count >= expectedPerContainer - tolerance && count <= expectedPerContainer + tolerance, + "Container $index was selected $count times, expected around $expectedPerContainer" + ) + } + + // Total should equal numConcurrentUploads + assertEquals(numConcurrentUploads, containerSelectionCounts.values.sum()) + } + + @Test + fun `counter handles integer overflow gracefully`() { + val containers = createContainers(3) + val list = RoundRobinContainerList.of(containers) + + // Call getNextStartIndex many times to verify it doesn't break + // We can't easily test Int.MAX_VALUE overflow, but we can verify stability + repeat(10000) { + val index = list.getNextStartIndex() + assertTrue(index in 0..2, "Index $index should be in range 0..2") + } + } + + @Test + fun `forEach iterates over all containers`() { + val containers = createContainers(3) + val list = RoundRobinContainerList.of(containers) + + val visited = mutableListOf() + list.forEach { visited.add(it) } + + assertEquals(containers, visited) + } + + @Test + fun `isNotEmpty returns correct value`() { + val emptyList = RoundRobinContainerList.empty() + val nonEmptyList = RoundRobinContainerList.of(createContainers(2)) + + assertFalse(emptyList.isNotEmpty()) + assertTrue(nonEmptyList.isNotEmpty()) + } + + @Test + fun `different list instances have independent counters`() { + val containers1 = createContainers(3) + val containers2 = createContainers(3) + + val list1 = RoundRobinContainerList.of(containers1) + val list2 = RoundRobinContainerList.of(containers2) + + // Advance list1's counter + list1.getNextStartIndex() // 0 + list1.getNextStartIndex() // 1 + list1.getNextStartIndex() // 2 + + // list2's counter should still be at 0 + assertEquals(0, list2.getNextStartIndex()) + + // Verify list1 continues from where it left off + assertEquals(0, list1.getNextStartIndex()) // wraps back to 0 + } + + @Test + fun `shared list between multiple simulated uploaders - verifies fix`() { + // This test specifically validates the fix for the reported issue: + // "if uploader A and uploader B (sharing the same configurationcache) upload, + // they will both use storage 0 first" + + val containers = createContainers(4) + val sharedList = RoundRobinContainerList.of(containers) + + // Simulate Uploader A's first upload + val uploaderAFirst = sharedList.getNextStartIndex() + assertEquals(0, uploaderAFirst, "Uploader A's first upload should use container 0") + + // Simulate Uploader B's first upload (SHOULD use container 1, not 0!) + val uploaderBFirst = sharedList.getNextStartIndex() + assertEquals(1, uploaderBFirst, "Uploader B's first upload should use container 1, NOT 0") + + // Simulate Uploader A's second upload + val uploaderASecond = sharedList.getNextStartIndex() + assertEquals(2, uploaderASecond, "Uploader A's second upload should use container 2") + + // Simulate Uploader B's second upload + val uploaderBSecond = sharedList.getNextStartIndex() + assertEquals(3, uploaderBSecond, "Uploader B's second upload should use container 3") + + // Back to container 0 + val uploaderCFirst = sharedList.getNextStartIndex() + assertEquals(0, uploaderCFirst, "Uploader C's first upload should wrap back to container 0") + } +} \ No newline at end of file diff --git a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java index 1d4f81c0..14349a61 100644 --- a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java +++ b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java @@ -784,7 +784,7 @@ private static TokenCredential buildIngestV2Credential(@NotNull IngestV2Quicksta String csvData = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null"; InputStream csvStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array()); StreamSource csvSource = new StreamSource(csvStream, Format.csv, CompressionType.NONE, UUID.randomUUID(), false); - futures.add(queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), csvSource, csvProps) + futures.add(queuedIngestClient.ingestAsyncJava(config.getDatabaseName(), config.getTableName(), csvSource, csvProps) .thenCompose(response -> { closeQuietly(csvStream); System.out.println("CSV stream ingestion queued. Operation ID: " + response.getIngestResponse().getIngestionOperationId()); @@ -794,7 +794,7 @@ private static TokenCredential buildIngestV2Credential(@NotNull IngestV2Quicksta InputStream jsonStream = Files.newInputStream(resolveQuickstartPath("dataset.json")); StreamSource jsonSource = new StreamSource(jsonStream, Format.json, CompressionType.NONE, UUID.randomUUID(), false); IngestRequestProperties jsonProps = buildIngestV2RequestProperties(config, ingestV2Config, ingestV2Config.getDataMappingName()); - futures.add(queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), jsonSource, jsonProps) + futures.add(queuedIngestClient.ingestAsyncJava(config.getDatabaseName(), config.getTableName(), jsonSource, jsonProps) .thenCompose(response -> { closeQuietly(jsonStream); System.out.println("JSON stream ingestion queued. Operation ID: " + response.getIngestResponse().getIngestionOperationId()); @@ -811,7 +811,7 @@ private static TokenCredential buildIngestV2Credential(@NotNull IngestV2Quicksta IngestRequestProperties csvProps = buildIngestV2RequestProperties(config, ingestV2Config, null); FileSource csvFileSource = new FileSource(resolveQuickstartPath("dataset.csv"), Format.csv, UUID.randomUUID(), CompressionType.NONE); - futures.add(queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), csvFileSource, csvProps) + futures.add(queuedIngestClient.ingestAsyncJava(config.getDatabaseName(), config.getTableName(), csvFileSource, csvProps) .thenCompose(response -> { System.out.println("CSV file ingestion queued. Operation ID: " + response.getIngestResponse().getIngestionOperationId()); return trackIngestV2Operation(config, ingestV2Config, queuedIngestClient, response, "CSV File"); @@ -819,7 +819,7 @@ private static TokenCredential buildIngestV2Credential(@NotNull IngestV2Quicksta FileSource jsonFileSource = new FileSource(resolveQuickstartPath("dataset.json"), Format.json, UUID.randomUUID(), CompressionType.NONE); IngestRequestProperties jsonProps = buildIngestV2RequestProperties(config, ingestV2Config, ingestV2Config.getDataMappingName()); - futures.add(queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), jsonFileSource, jsonProps) + futures.add(queuedIngestClient.ingestAsyncJava(config.getDatabaseName(), config.getTableName(), jsonFileSource, jsonProps) .thenCompose(response -> { System.out.println("JSON file ingestion queued. Operation ID: " + response.getIngestResponse().getIngestionOperationId()); return trackIngestV2Operation(config, ingestV2Config, queuedIngestClient, response, "JSON File"); @@ -894,7 +894,7 @@ private static TokenCredential buildIngestV2Credential(@NotNull IngestV2Quicksta } System.out.println("Ingesting " + blobSources.size() + " blobs as a batch..."); - return queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), blobSources, props) + return queuedIngestClient.ingestAsyncJava(config.getDatabaseName(), config.getTableName(), blobSources, props) .thenCompose(response -> { System.out.println("Batch ingestion queued. Operation ID: " + response.getIngestResponse().getIngestionOperationId()); @@ -935,14 +935,14 @@ private static TokenCredential buildIngestV2Credential(@NotNull IngestV2Quicksta Duration pollTimeout = Duration.ofMinutes(ingestV2Config.getPollingTimeoutMinutes()); System.out.println("\n--- Tracking " + operationName + " ---"); - return queuedIngestClient.getOperationDetailsAsync(operation) + return queuedIngestClient.getOperationDetailsAsyncJava(operation) .thenCompose(initialDetails -> { System.out.println("[" + operationName + "] Initial Operation Details:"); printIngestV2StatusResponse(initialDetails); System.out.println("[" + operationName + "] Polling for completion..."); return queuedIngestClient.pollForCompletion(operation, pollInterval, pollTimeout); }) - .thenCompose(fin -> queuedIngestClient.getOperationDetailsAsync(operation)) + .thenCompose(fin -> queuedIngestClient.getOperationDetailsAsyncJava(operation)) .thenAccept(finalDetails -> { System.out.println("[" + operationName + "] Final Operation Details:"); printIngestV2StatusResponse(finalDetails); diff --git a/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java b/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java index b0ca14f8..5f1b3bbd 100644 --- a/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java +++ b/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java @@ -8,6 +8,7 @@ import com.azure.identity.ChainedTokenCredentialBuilder; import com.azure.identity.ClientSecretCredentialBuilder; import com.microsoft.azure.kusto.data.StringUtils; +import com.microsoft.azure.kusto.ingest.v2.IngestClientBase; import com.microsoft.azure.kusto.ingest.v2.common.models.mapping.IngestionMapping; import com.microsoft.azure.kusto.ingest.v2.source.BlobSource; import com.microsoft.azure.kusto.ingest.v2.source.CompressionType; @@ -297,7 +298,7 @@ public static void main(String[] args) throws Exception { System.out.println(" .build();"); System.out.println(); System.out.println("// 5. Ingest - the custom uploader handles the upload!"); - System.out.println("client.ingestAsync(fileSource, properties).join();"); + System.out.println("client.ingestAsyncJava(fileSource, properties).join();"); System.out.println("```"); return; } @@ -330,12 +331,9 @@ public static void main(String[] args) throws Exception { } // Build the ingest URL (DM endpoint) - // The correct pattern: https://cluster.region.kusto.windows.net -> https://ingest-cluster.region.kusto.windows.net - String dmUrl = engineEndpoint; - if (engineEndpoint.startsWith("https://")) { - dmUrl = engineEndpoint.replace("https://", "https://ingest-"); - } else if (engineEndpoint.startsWith("http://")) { - dmUrl = engineEndpoint.replace("http://", "http://ingest-"); + String dmUrl = IngestClientBase.getIngestionEndpoint(engineEndpoint); + if (dmUrl == null) { + dmUrl = engineEndpoint; // Fallback if transformation not applicable } System.out.println("DM Endpoint: " + dmUrl); @@ -397,7 +395,7 @@ public static void main(String[] args) throws Exception { System.out.println(" (This will use the custom uploader to upload to Azure Blob Storage!)"); // Perform ingestion - the custom uploader handles the upload! - var response = queuedIngestClient.ingestAsync(database, table, fileSource, properties).get(); + var response = queuedIngestClient.ingestAsyncJava(database, table, fileSource, properties).get(); System.out.println("\n5. Ingestion queued successfully!"); System.out.println(" Operation ID: " + response.getIngestResponse().getIngestionOperationId()); @@ -415,7 +413,7 @@ public static void main(String[] args) throws Exception { System.out.println("\n6. Ingesting JSON file with mapping: " + resourcesDirectory + "dataset.json"); - var jsonResponse = queuedIngestClient.ingestAsync(database, table, jsonFileSource, jsonProperties).get(); + var jsonResponse = queuedIngestClient.ingestAsyncJava(database, table, jsonFileSource, jsonProperties).get(); System.out.println(" JSON Ingestion queued successfully!"); System.out.println(" Operation ID: " + jsonResponse.getIngestResponse().getIngestionOperationId()); @@ -433,7 +431,7 @@ public static void main(String[] args) throws Exception { System.out.println("\n7. Ingesting from stream (CSV data)"); - var streamResponse = queuedIngestClient.ingestAsync(database, table,streamSource, streamProperties).get(); + var streamResponse = queuedIngestClient.ingestAsyncJava(database, table,streamSource, streamProperties).get(); System.out.println(" Stream Ingestion queued successfully!"); System.out.println(" Operation ID: " + streamResponse.getIngestResponse().getIngestionOperationId()); @@ -444,7 +442,7 @@ public static void main(String[] args) throws Exception { System.out.println(" 1. AzureBlobRestCustomUploader implements ICustomUploader"); System.out.println(" 2. CustomUploaderHelper.asUploader() converts to IUploader"); System.out.println(" 3. QueuedIngestClientBuilder.withUploader() configures the custom uploader"); - System.out.println(" 4. client.ingestAsync() internally uses the custom uploader!"); + System.out.println(" 4. client.ingestAsyncJava() internally uses the custom uploader!"); System.out.println(); System.out.println("The same pattern works for any other source such as S3/GCP - just implement ICustomUploader!"); diff --git a/samples/src/main/java/ingestv2/ManagedStreamingIngestV2.java b/samples/src/main/java/ingestv2/ManagedStreamingIngestV2.java index 0ea238f6..fad745f9 100644 --- a/samples/src/main/java/ingestv2/ManagedStreamingIngestV2.java +++ b/samples/src/main/java/ingestv2/ManagedStreamingIngestV2.java @@ -136,7 +136,7 @@ static void ingestFromStream() throws Exception { .build(); System.out.println("Ingesting small CSV data from string..."); - ExtendedIngestResponse csvResponse = managedStreamingIngestClient.ingestAsync(database, table, csvStreamSource, csvProperties).get(); + ExtendedIngestResponse csvResponse = managedStreamingIngestClient.ingestAsyncJava(database, table, csvStreamSource, csvProperties).get(); printIngestionResult("CSV String", csvResponse); // Example 2: Ingest from compressed CSV file @@ -152,7 +152,7 @@ static void ingestFromStream() throws Exception { System.out.println("Ingesting compressed CSV file..."); ExtendedIngestResponse compressedResponse = managedStreamingIngestClient - .ingestAsync(database, table, compressedStreamSource, csvProperties) + .ingestAsyncJava(database, table, compressedStreamSource, csvProperties) .get(); printIngestionResult("Compressed CSV", compressedResponse); compressedCsvStream.close(); @@ -173,7 +173,7 @@ static void ingestFromStream() throws Exception { .build(); System.out.println("Ingesting JSON file with mapping..."); - ExtendedIngestResponse jsonResponse = managedStreamingIngestClient.ingestAsync(database, table, jsonStreamSource, jsonProperties).get(); + ExtendedIngestResponse jsonResponse = managedStreamingIngestClient.ingestAsyncJava(database, table, jsonStreamSource, jsonProperties).get(); printIngestionResult("JSON with Mapping", jsonResponse); jsonStream.close(); } @@ -200,7 +200,7 @@ static void ingestFromFile() throws Exception { .build(); System.out.println("Ingesting CSV file..."); - ExtendedIngestResponse csvResponse = managedStreamingIngestClient.ingestAsync(database, table, csvFileSource, csvProperties).get(); + ExtendedIngestResponse csvResponse = managedStreamingIngestClient.ingestAsyncJava(database, table, csvFileSource, csvProperties).get(); printIngestionResult("CSV File", csvResponse); // Example 2: Ingest compressed JSON file with mapping @@ -217,7 +217,7 @@ static void ingestFromFile() throws Exception { .build(); System.out.println("Ingesting compressed JSON file with mapping..."); - ExtendedIngestResponse jsonResponse = managedStreamingIngestClient.ingestAsync(database, table, jsonFileSource, jsonProperties).get(); + ExtendedIngestResponse jsonResponse = managedStreamingIngestClient.ingestAsyncJava(database, table, jsonFileSource, jsonProperties).get(); printIngestionResult("Compressed JSON File", jsonResponse); } @@ -274,7 +274,7 @@ static void demonstrateFallbackTracking() throws Exception { System.out.println("(Watch for fallback log messages from ManagedStreamingIngestClient)"); System.out.println(); - ExtendedIngestResponse response = managedStreamingIngestClient.ingestAsync(database, table, largeStreamSource, properties).get(); + ExtendedIngestResponse response = managedStreamingIngestClient.ingestAsyncJava(database, table, largeStreamSource, properties).get(); printIngestionResult("Large Data Ingestion", response); // The large data should trigger queued fallback @@ -290,7 +290,7 @@ static void demonstrateFallbackTracking() throws Exception { response.getIngestionType()); // Get initial operation details - CompletableFuture detailsFuture = managedStreamingIngestClient.getOperationDetailsAsync(operation); + CompletableFuture detailsFuture = managedStreamingIngestClient.getOperationDetailsAsyncJava(operation); StatusResponse details = detailsFuture.get(); printStatusResponse("Initial Status", details); @@ -416,7 +416,7 @@ private static StatusResponse pollForCompletionManually( long intervalMillis = pollingInterval.toMillis(); while (System.currentTimeMillis() - startTime < timeoutMillis) { - StatusResponse status = managedStreamingIngestClient.getOperationDetailsAsync(operation).get(); + StatusResponse status = managedStreamingIngestClient.getOperationDetailsAsyncJava(operation).get(); // Check if completed (no more in-progress items) Status summary = status.getStatus(); @@ -438,7 +438,7 @@ private static StatusResponse pollForCompletionManually( // Timeout reached, return latest status System.out.println("Polling timeout reached. Returning latest status."); - return managedStreamingIngestClient.getOperationDetailsAsync(operation).get(); + return managedStreamingIngestClient.getOperationDetailsAsyncJava(operation).get(); } /** Prints the ingestion result including which method (streaming or queued) was used. */ diff --git a/samples/src/main/java/ingestv2/QueuedIngestV2.java b/samples/src/main/java/ingestv2/QueuedIngestV2.java index bf68de0d..78d61f9f 100644 --- a/samples/src/main/java/ingestv2/QueuedIngestV2.java +++ b/samples/src/main/java/ingestv2/QueuedIngestV2.java @@ -8,6 +8,7 @@ import com.azure.identity.ChainedTokenCredential; import com.azure.identity.ClientSecretCredentialBuilder; import com.microsoft.azure.kusto.data.StringUtils; +import com.microsoft.azure.kusto.ingest.v2.IngestClientBase; import com.microsoft.azure.kusto.ingest.v2.builders.QueuedIngestClientBuilder; import com.microsoft.azure.kusto.ingest.v2.client.IngestionOperation; import com.microsoft.azure.kusto.ingest.v2.client.QueuedIngestClient; @@ -47,10 +48,14 @@ import java.util.concurrent.TimeUnit; /** - * Sample demonstrating queued ingestion using the new ingest-v2 API. This is the modern API that - * uses Kotlin-based clients with coroutines, providing better async support and a cleaner API - * design. Queued ingestion is asynchronous and provides reliable, high-throughput data ingestion - * with operation tracking capabilities. + * Sample demonstrating queued ingestion using the new ingest-v2 API. + * + *

This sample shows simple, blocking examples that are easy to understand. + * For production use, consider using the async API with CompletableFuture to avoid + * blocking threads - see {@link #advancedAsyncIngestionExample()} for an example. + * + *

Queued ingestion is asynchronous on the server side and provides reliable, + * high-throughput data ingestion with operation tracking capabilities. */ public class QueuedIngestV2 { @@ -100,19 +105,13 @@ public static void main(String[] args) { System.out.println("Queued Ingest Client created successfully"); - // Collect all futures for non-blocking execution - List> allFutures = new ArrayList<>(); + // Run simple blocking ingestion examples + ingestFromStream(); + ingestFromFile(); + ingestMultipleSources(); - // Run ingestion examples - allFutures.addAll(ingestFromStream()); - allFutures.addAll(ingestFromFile()); - allFutures.add(ingestMultipleSources()); - - // Wait for all operations to complete - CompletableFuture allOf = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0])); - - System.out.println("\nWaiting for all ingestion operations to complete..."); - allOf.get(5, TimeUnit.MINUTES); + // Uncomment to see the advanced async pattern: + // advancedAsyncIngestionExample(); System.out.println("\nAll ingestion operations completed successfully!"); @@ -127,173 +126,152 @@ public static void main(String[] args) { } /** - * Demonstrates ingestion from various stream sources including: - In-memory string data as CSV - * - Compressed file stream (CSV) - JSON file stream with mapping + * Demonstrates ingestion from various stream sources including: + * - In-memory string data as CSV + * - Compressed file stream (CSV) + * - JSON file stream with mapping * - *

Shows both source configuration with defaults) and source configuration with full control) approaches. + *

These examples use blocking .get() calls for simplicity. In production, + * consider using the async API with CompletableFuture composition instead. */ - static List> ingestFromStream() throws Exception { + static void ingestFromStream() throws Exception { System.out.println("\n=== Queued Ingestion from Streams ==="); - List> futures = new ArrayList<>(); + String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; - // Example 1: Ingest from in-memory CSV string (only 2 required parameters) - // sourceCompression defaults to CompressionType.NONE, sourceId auto-generated, baseName null, leaveOpen false + // Example 1: Ingest from in-memory CSV string String csvData = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null"; InputStream csvInputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array()); - StreamSource csvStreamSource = new StreamSource(csvInputStream, Format.csv); + try { + StreamSource csvStreamSource = new StreamSource(csvInputStream, Format.csv); - IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() - .withEnableTracking(true) - .build(); + IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); - System.out.println("Queueing CSV data from string..."); - CompletableFuture csvFuture = - queuedIngestClient - .ingestAsync(database, table, csvStreamSource, csvProperties) - .thenCompose( - response -> { - System.out.println( - "CSV ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation(response, "CSV Stream"); - }) - .whenComplete((unused, throwable) -> closeQuietly(csvInputStream)); - futures.add(csvFuture); - - // Example 2: Ingest from compressed CSV file (all 6 parameters needed) - // Explicitly specify compression, sourceId, baseName, and leaveOpen - String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; - InputStream compressedCsvStream = new ByteArrayInputStream(readResourceBytes(resourcesDirectory, "dataset.csv.gz")); + System.out.println("Ingesting CSV data from string..."); + // Note: ingestAsync returns a CompletableFuture. We call .get() to block and wait for completion. + // In production, you might compose futures instead of blocking - see advancedAsyncIngestionExample(). + ExtendedIngestResponse response = queuedIngestClient + .ingestAsyncJava(database, table, csvStreamSource, csvProperties) + .get(2, TimeUnit.MINUTES); - StreamSource compressedStreamSource = - new StreamSource( - compressedCsvStream, - Format.csv, - CompressionType.GZIP, - UUID.randomUUID(), - false); - - System.out.println("Queueing compressed CSV file..."); - CompletableFuture compressedFuture = - queuedIngestClient - .ingestAsync(database, table, compressedStreamSource, csvProperties) - .thenCompose( - response -> { - System.out.println( - "Compressed CSV ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation( - response, "Compressed CSV Stream"); - }) - .whenComplete((unused, throwable) -> closeQuietly(compressedCsvStream)); - futures.add(compressedFuture); - - // Example 3: Ingest JSON with mapping - with defaults - // Uses defaults: sourceCompression=NONE, auto-generated sourceId, leaveOpen=false - InputStream jsonStream = - new ByteArrayInputStream(readResourceBytes(resourcesDirectory, "dataset.json")); - - StreamSource jsonStreamSource = new StreamSource(jsonStream, Format.json); - - IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); - IngestRequestProperties jsonProperties = - IngestRequestPropertiesBuilder.create() - .withIngestionMapping(mapping) - .withEnableTracking(true) - .build(); + System.out.println("CSV ingestion queued successfully. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + trackIngestionOperation(response, "CSV Stream"); + } finally { + closeQuietly(csvInputStream); + } + + // Example 2: Ingest from compressed CSV file + InputStream compressedCsvStream = new ByteArrayInputStream( + readResourceBytes(resourcesDirectory, "dataset.csv.gz")); - System.out.println("Queueing JSON file with mapping..."); - CompletableFuture jsonFuture = - queuedIngestClient - .ingestAsync(database, table, jsonStreamSource, jsonProperties) - .thenCompose( - response -> { - System.out.println( - "JSON ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation(response, "JSON Stream"); - }) - .whenComplete((unused, throwable) -> closeQuietly(jsonStream)); - futures.add(jsonFuture); - - return futures; + try { + StreamSource compressedStreamSource = new StreamSource( + compressedCsvStream, + Format.csv, + CompressionType.GZIP, + UUID.randomUUID(), + false); + + IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); + + System.out.println("Ingesting compressed CSV file..."); + ExtendedIngestResponse response = queuedIngestClient + .ingestAsyncJava(database, table, compressedStreamSource, csvProperties) + .get(2, TimeUnit.MINUTES); + + System.out.println("Compressed CSV ingestion queued successfully. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + trackIngestionOperation(response, "Compressed CSV Stream"); + } finally { + closeQuietly(compressedCsvStream); + } + + // Example 3: Ingest JSON with mapping + InputStream jsonStream = new ByteArrayInputStream( + readResourceBytes(resourcesDirectory, "dataset.json")); + + try { + StreamSource jsonStreamSource = new StreamSource(jsonStream, Format.json); + + IngestionMapping mapping = new IngestionMapping( + mappingName, IngestionMapping.IngestionMappingType.JSON); + IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); + + System.out.println("Ingesting JSON file with mapping..."); + ExtendedIngestResponse response = queuedIngestClient + .ingestAsyncJava(database, table, jsonStreamSource, jsonProperties) + .get(2, TimeUnit.MINUTES); + + System.out.println("JSON ingestion queued successfully. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + trackIngestionOperation(response, "JSON Stream"); + } finally { + closeQuietly(jsonStream); + } } /** - * Demonstrates ingestion from file sources including: - CSV file - Compressed JSON file with - * mapping + * Demonstrates ingestion from file sources including: + * - CSV file + * - Compressed JSON file with mapping * - * Shows both source configuration with defaults and source configuration with all params approaches. + *

These examples use blocking .get() calls for simplicity. */ - static List> ingestFromFile() { + static void ingestFromFile() throws Exception { System.out.println("\n=== Queued Ingestion from Files ==="); - List> futures = new ArrayList<>(); - String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; - // Example 1: Ingest CSV file - with defaults - // compressionType auto-detected from filename (.csv = NONE), sourceId auto-generated, baseName auto-extracted - FileSource csvFileSource = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); + // Example 1: Ingest CSV file + FileSource csvFileSource = new FileSource( + Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); - IngestRequestProperties csvProperties = - IngestRequestPropertiesBuilder.create() - .withEnableTracking(true) - .build(); + IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); - System.out.println("Queueing CSV file..."); - CompletableFuture csvFuture = - queuedIngestClient - .ingestAsync(database, table, csvFileSource, csvProperties) - .thenCompose( - response -> { - System.out.println( - "CSV file ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation(response, "CSV File"); - }); - futures.add(csvFuture); - - // Example 2: Ingest compressed JSON file with mapping - with all parameters specified - // Explicitly specify sourceId, compression (auto-detected from .gz), and baseName for full control - FileSource jsonFileSource = - new FileSource( - Paths.get(resourcesDirectory + "dataset.jsonz.gz"), - Format.json, - UUID.randomUUID(), - CompressionType.GZIP); + System.out.println("Ingesting CSV file..."); + ExtendedIngestResponse csvResponse = queuedIngestClient + .ingestAsyncJava(database, table, csvFileSource, csvProperties) + .get(2, TimeUnit.MINUTES); - IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); - IngestRequestProperties jsonProperties = - IngestRequestPropertiesBuilder.create() - .withIngestionMapping(mapping) - .withEnableTracking(true) - .build(); + System.out.println("CSV file ingestion queued successfully. Operation ID: " + + csvResponse.getIngestResponse().getIngestionOperationId()); + trackIngestionOperation(csvResponse, "CSV File"); + + // Example 2: Ingest compressed JSON file with mapping + FileSource jsonFileSource = new FileSource( + Paths.get(resourcesDirectory + "dataset.jsonz.gz"), + Format.json, + UUID.randomUUID(), + CompressionType.GZIP); + + IngestionMapping mapping = new IngestionMapping( + mappingName, IngestionMapping.IngestionMappingType.JSON); + IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); + + System.out.println("Ingesting compressed JSON file with mapping..."); + ExtendedIngestResponse jsonResponse = queuedIngestClient + .ingestAsyncJava(database, table, jsonFileSource, jsonProperties) + .get(2, TimeUnit.MINUTES); - System.out.println("Queueing compressed JSON file with mapping..."); - CompletableFuture jsonFuture = - queuedIngestClient - .ingestAsync(database, table, jsonFileSource, jsonProperties) - .thenCompose( - response -> { - System.out.println( - "Compressed JSON file ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation( - response, "Compressed JSON File"); - }); - futures.add(jsonFuture); - - return futures; + System.out.println("Compressed JSON file ingestion queued successfully. Operation ID: " + + jsonResponse.getIngestResponse().getIngestionOperationId()); + trackIngestionOperation(jsonResponse, "Compressed JSON File"); } /** @@ -307,21 +285,12 @@ static List> ingestFromFile() { * BlobSource list to ingestAsync * * - *

This example uses public blob URLs from the Kusto sample files to demonstrate - * multi-blob batch ingestion. All blobs must have the same format. + *

This example uses public blob URLs from the Kusto sample files. */ - static CompletableFuture ingestMultipleSources() { + static void ingestMultipleSources() throws Exception { System.out.println("\n=== Queued Ingestion from Multiple Blob Sources (Batch) ==="); - // Multi-source API only accepts BlobSource - not FileSource or StreamSource. - // If you have local files, you must upload them to blob storage first. - // Here we use public sample blob URLs from Kusto sample files to demonstrate the pattern. - // IMPORTANT: All sources in a batch must have the same format! - // BlobSource constructor requires: blobPath, format, sourceId, compressionType, baseName - - // Using multiple JSON files from Kusto public sample files - // All files are JSON format - this is required for batch ingestion BlobSource blob1 = new BlobSource( "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json", Format.json, @@ -334,29 +303,99 @@ static CompletableFuture ingestMultipleSources() { UUID.randomUUID(), CompressionType.NONE); - // Create list with all blob sources - all must have identical format List blobSources = Arrays.asList(blob1, blob2); - IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); - IngestRequestProperties properties = - IngestRequestPropertiesBuilder.create() - .withIngestionMapping(mapping) - .withEnableTracking(true) - .build(); - System.out.println("Queueing " + blobSources.size() + " blob sources in batch..."); + IngestionMapping mapping = new IngestionMapping( + mappingName, IngestionMapping.IngestionMappingType.JSON); + IngestRequestProperties properties = IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); + + System.out.println("Ingesting " + blobSources.size() + " blob sources in batch..."); for (int i = 0; i < blobSources.size(); i++) { System.out.println(" Blob " + (i + 1) + ": " + blobSources.get(i).getName()); } - return queuedIngestClient - .ingestAsync(database, table, blobSources, properties) - .thenCompose(response -> { - System.out.println( - "Batch ingestion queued. Operation ID: " - + response.getIngestResponse().getIngestionOperationId()); - System.out.println("Number of sources in batch: " + blobSources.size()); - return trackIngestionOperation(response, "Batch Blob Ingestion"); + ExtendedIngestResponse response = queuedIngestClient + .ingestAsyncJava(database, table, blobSources, properties) + .get(2, TimeUnit.MINUTES); + + System.out.println("Batch ingestion queued successfully. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + System.out.println("Number of sources in batch: " + blobSources.size()); + trackIngestionOperation(response, "Batch Blob Ingestion"); + } + + /** + * Advanced example demonstrating non-blocking async ingestion using CompletableFuture. + * + *

This pattern is recommended for production use when you need to: + *

    + *
  • Ingest multiple sources concurrently without blocking threads
  • + *
  • Compose multiple async operations
  • + *
  • Handle results/errors asynchronously
  • + *
+ * + *

The key difference from the simple examples is that we compose futures + * instead of calling .get() immediately, allowing the operations to run concurrently. + */ + static void advancedAsyncIngestionExample() throws Exception { + System.out.println("\n=== Advanced Async Ingestion Example ==="); + + String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; + + // Create multiple sources to ingest concurrently + FileSource csvFile = new FileSource( + Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); + FileSource jsonFile = new FileSource( + Paths.get(resourcesDirectory + "dataset.jsonz.gz"), + Format.json, + UUID.randomUUID(), + CompressionType.GZIP); + + IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); + + IngestionMapping mapping = new IngestionMapping( + mappingName, IngestionMapping.IngestionMappingType.JSON); + IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); + + // Start both ingestions concurrently - don't call .get() yet! + System.out.println("Starting concurrent ingestion of CSV and JSON files..."); + + CompletableFuture csvFuture = queuedIngestClient + .ingestAsyncJava(database, table, csvFile, csvProperties) + .thenApply(response -> { + System.out.println("CSV ingestion queued. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + return response; }); + + CompletableFuture jsonFuture = queuedIngestClient + .ingestAsyncJava(database, table, jsonFile, jsonProperties) + .thenApply(response -> { + System.out.println("JSON ingestion queued. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + return response; + }); + + // Compose futures to track both operations + CompletableFuture csvTracking = csvFuture + .thenCompose(response -> trackIngestionOperationAsync(response, "CSV File (Async)")); + + CompletableFuture jsonTracking = jsonFuture + .thenCompose(response -> trackIngestionOperationAsync(response, "JSON File (Async)")); + + // Wait for all operations to complete + System.out.println("Waiting for all async operations to complete..."); + CompletableFuture.allOf(csvTracking, jsonTracking).get(5, TimeUnit.MINUTES); + + System.out.println("All async ingestion operations completed!"); } /** @@ -367,126 +406,97 @@ static CompletableFuture ingestMultipleSources() { *

    *
  1. Create a ManagedUploader with proper configuration
  2. *
  3. Create list of LocalSource (FileSource) objects
  4. - *
  5. Call uploader.uploadManyAsyncJava(localSources) to upload all files to blob storage
  6. + *
  7. Call uploader.uploadManyAsync(localSources) to upload all files to blob storage
  8. *
  9. Convert successful upload results to BlobSource list
  10. - *
  11. Call queuedIngestClient.ingestAsync(blobSources, properties) to ingest as a batch
  12. + *
  13. Call queuedIngestClient.ingestAsyncJava(blobSources, properties) to ingest as a batch
  14. *
- * - *

This approach allows batch ingestion of local files by first uploading them - * to blob storage, which is required because the multi-source API only accepts BlobSource. */ - static CompletableFuture ingestMultipleLocalFilesViaBlobUpload( - String engineEndpoint, ChainedTokenCredential credential) { + static void ingestMultipleLocalFilesViaBlobUpload( + String engineEndpoint, ChainedTokenCredential credential) throws Exception { System.out.println("\n=== Queued Ingestion: Upload Local Files to Blob, Then Ingest ==="); // Step 1: Create configuration cache (needed for ManagedUploader) - String dmUrl = engineEndpoint.replace(".kusto.", ".ingest-"); - - ConfigurationCache configCache = - DefaultConfigurationCache.create( - dmUrl, - credential, - new ClientDetails("QueuedIngestV2Sample", "1.0", "ingest-v2-sample")); - - // Step 2: Create ManagedUploader for batch uploading local files to blob storage - ManagedUploader uploader = - ManagedUploader.builder() - .withConfigurationCache(configCache) - .withRetryPolicy(new SimpleRetryPolicy()) - .withMaxConcurrency(10) - .withMaxDataSize(4L * 1024 * 1024 * 1024) // 4GB max size - .withUploadMethod(UploadMethod.STORAGE) - .withTokenCredential(credential) - .build(); + String dmUrl = IngestClientBase.getIngestionEndpoint(engineEndpoint); + if (dmUrl == null) { + dmUrl = engineEndpoint; + } - System.out.println("ManagedUploader created for batch upload"); + ConfigurationCache configCache = DefaultConfigurationCache.create( + dmUrl, + credential, + new ClientDetails("QueuedIngestV2Sample", "1.0", "ingest-v2-sample")); + + // Step 2: Create ManagedUploader + ManagedUploader uploader = ManagedUploader.builder() + .withConfigurationCache(configCache) + .withRetryPolicy(new SimpleRetryPolicy()) + .withMaxConcurrency(10) + .withMaxDataSize(4L * 1024 * 1024 * 1024) // 4GB max size + .withUploadMethod(UploadMethod.STORAGE) + .withTokenCredential(credential) + .build(); - // Step 3: Prepare list of local files to upload (all same format - CSV) - String resourcesDirectory = System.getProperty("user.dir") + "/src/main/resources/"; + try { + System.out.println("ManagedUploader created for batch upload"); - // IMPORTANT: All files must have the same format for batch ingestion! - FileSource file1 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); - FileSource file2 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv.gz"), Format.csv); + // Step 3: Prepare local files (all same format - CSV) + String resourcesDirectory = System.getProperty("user.dir") + "/src/main/resources/"; + FileSource file1 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); + FileSource file2 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv.gz"), Format.csv); + List localSources = Arrays.asList(file1, file2); - List localSources = Arrays.asList(file1, file2); + System.out.println("Prepared " + localSources.size() + " local files for upload"); - System.out.println("Prepared " + localSources.size() + " local files for upload:"); - for (LocalSource source : localSources) { - System.out.println(" - " + source.getName() + " (format: " + source.getFormat() + ")"); - } + // Step 4: Upload all local files to blob storage + System.out.println("Uploading files to blob storage..."); + var uploadResults = uploader.uploadManyAsync(localSources).get(5, TimeUnit.MINUTES); - IngestRequestProperties properties = - IngestRequestPropertiesBuilder.create() - .withEnableTracking(true) - .build(); + System.out.println("Upload completed:"); + System.out.println(" Successes: " + uploadResults.getSuccesses().size()); + System.out.println(" Failures: " + uploadResults.getFailures().size()); - // Step 4: Upload all local files to blob storage using uploadManyAsync - // Note: The Kotlin suspend function uploadManyAsync is exposed to Java as uploadManyAsync - // (via @JvmName annotation) and returns CompletableFuture - System.out.println("Uploading " + localSources.size() + " files to blob storage..."); - - return uploader.uploadManyAsync(localSources) - .thenCompose(uploadResults -> { - // Step 5: Process upload results - System.out.println("Upload completed:"); - System.out.println(" Successes: " + uploadResults.getSuccesses().size()); - System.out.println(" Failures: " + uploadResults.getFailures().size()); - - // Log any failures - for (var failure : uploadResults.getFailures()) { - System.err.println(" Upload failed for " + failure.getSourceName() - + ": " + failure.getErrorMessage()); - } - - // Step 6: Convert successful uploads to BlobSource list - List blobSources = new ArrayList<>(); - for (var success : uploadResults.getSuccesses()) { - System.out.println(" Uploaded: " + success.getSourceName() - + " -> " + success.getBlobUrl().split("\\?")[0]); // Hide SAS token in log - - // Create BlobSource from upload result - // Match format from original FileSource (CSV in this case) - BlobSource blobSource = new BlobSource( - success.getBlobUrl(), - Format.csv, // All our files are CSV format - UUID.randomUUID(), - CompressionType.GZIP // Uploader auto-compresses to GZIP - ); - blobSources.add(blobSource); - } - - if (blobSources.isEmpty()) { - return CompletableFuture.failedFuture( - new RuntimeException("All uploads failed - nothing to ingest")); - } - - // Step 7: Ingest all blobs as a batch - System.out.println("Ingesting " + blobSources.size() + " blobs as a batch..."); - return queuedIngestClient.ingestAsync(database, table, blobSources, properties) - .thenCompose(response -> { - System.out.println( - "Batch ingestion queued. Operation ID: " - + response.getIngestResponse().getIngestionOperationId()); - System.out.println("Number of sources in batch: " + blobSources.size()); - return trackIngestionOperation(response, "Local Files Via Blob Upload"); - }); - }) - .whenComplete((unused, throwable) -> { - // Clean up uploader - uploader.close(); - System.out.println("ManagedUploader closed"); - }); + // Step 5: Convert successful uploads to BlobSource list + List blobSources = new ArrayList<>(); + for (var success : uploadResults.getSuccesses()) { + BlobSource blobSource = new BlobSource( + success.getBlobUrl(), + Format.csv, + UUID.randomUUID(), + CompressionType.GZIP); + blobSources.add(blobSource); + } + + if (blobSources.isEmpty()) { + throw new RuntimeException("All uploads failed - nothing to ingest"); + } + + // Step 6: Ingest all blobs as a batch + IngestRequestProperties properties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); + + System.out.println("Ingesting " + blobSources.size() + " blobs as a batch..."); + ExtendedIngestResponse response = queuedIngestClient + .ingestAsyncJava(database, table, blobSources, properties) + .get(2, TimeUnit.MINUTES); + + System.out.println("Batch ingestion queued successfully. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + trackIngestionOperation(response, "Local Files Via Blob Upload"); + + } finally { + uploader.close(); + System.out.println("ManagedUploader closed"); + } } /** - * Tracks an ingestion operation by: 1. Getting operation details immediately after queueing 2. - * Polling for completion 3. Getting final operation details 4. Printing status information + * Tracks an ingestion operation synchronously (blocking). */ - private static CompletableFuture trackIngestionOperation( - ExtendedIngestResponse response, String operationName) { + private static void trackIngestionOperation( + ExtendedIngestResponse response, String operationName) throws Exception { IngestionOperation operation = new IngestionOperation( - Objects.requireNonNull( - response.getIngestResponse().getIngestionOperationId()), + Objects.requireNonNull(response.getIngestResponse().getIngestionOperationId()), database, table, response.getIngestionType()); @@ -494,44 +504,66 @@ private static CompletableFuture trackIngestionOperation( System.out.println("\n--- Tracking " + operationName + " ---"); // Get initial operation details + StatusResponse initialDetails = queuedIngestClient + .getOperationDetailsAsyncJava(operation) + .get(1, TimeUnit.MINUTES); + + System.out.println("[" + operationName + "] Initial Operation Details:"); + printStatusResponse(initialDetails); + + // Poll for completion + System.out.println("[" + operationName + "] Polling for completion..."); + queuedIngestClient.pollForCompletion( + operation, + Duration.ofSeconds(30), + Duration.ofMinutes(2)) + .get(3, TimeUnit.MINUTES); + + System.out.println("[" + operationName + "] Polling completed."); + + // Get final operation details + StatusResponse finalDetails = queuedIngestClient + .getOperationDetailsAsyncJava(operation) + .get(1, TimeUnit.MINUTES); + + System.out.println("[" + operationName + "] Final Operation Details:"); + printStatusResponse(finalDetails); + System.out.println("[" + operationName + "] Operation tracking completed.\n"); + } + + /** + * Tracks an ingestion operation asynchronously using CompletableFuture composition. + * Used by {@link #advancedAsyncIngestionExample()}. + */ + private static CompletableFuture trackIngestionOperationAsync( + ExtendedIngestResponse response, String operationName) { + IngestionOperation operation = new IngestionOperation( + Objects.requireNonNull(response.getIngestResponse().getIngestionOperationId()), + database, + table, + response.getIngestionType()); + + System.out.println("\n--- Tracking " + operationName + " (async) ---"); + return queuedIngestClient - .getOperationDetailsAsync(operation) - .thenCompose( - initialDetails -> { - System.out.println( - "[" + operationName + "] Initial Operation Details:"); - printStatusResponse(initialDetails); - - // Poll for completion - System.out.println("[" + operationName + "] Polling for completion..."); - return queuedIngestClient.pollForCompletion( - operation, - Duration.ofSeconds(30), - Duration.ofMinutes(2)); // 2 minutes timeout - }) - .thenCompose( - finalStatus -> { - System.out.println("[" + operationName + "] Polling completed."); - // Get final operation details - return queuedIngestClient.getOperationDetailsAsync(operation); - }) - .thenAccept( - finalDetails -> { - System.out.println("[" + operationName + "] Final Operation Details:"); - printStatusResponse(finalDetails); - System.out.println( - "[" + operationName + "] Operation tracking completed.\n"); - }) - .exceptionally( - error -> { - System.err.println( - "[" - + operationName - + "] Error tracking operation: " - + error.getMessage()); - error.printStackTrace(); - return null; - }); + .getOperationDetailsAsyncJava(operation) + .thenCompose(initialDetails -> { + System.out.println("[" + operationName + "] Initial status received"); + return queuedIngestClient.pollForCompletion( + operation, + Duration.ofSeconds(30), + Duration.ofMinutes(2)); + }) + .thenCompose(finalStatus -> queuedIngestClient.getOperationDetailsAsyncJava(operation)) + .thenAccept(finalDetails -> { + System.out.println("[" + operationName + "] Final Operation Details:"); + printStatusResponse(finalDetails); + System.out.println("[" + operationName + "] Async tracking completed.\n"); + }) + .exceptionally(error -> { + System.err.println("[" + operationName + "] Error: " + error.getMessage()); + return null; + }); } /** Prints detailed status information from a StatusResponse */ @@ -586,4 +618,4 @@ private static void closeQuietly(InputStream stream) { System.err.println("Failed to close stream: " + e.getMessage()); } } -} +} \ No newline at end of file diff --git a/samples/src/main/java/ingestv2/StreamingIngestV2.java b/samples/src/main/java/ingestv2/StreamingIngestV2.java index 7c3b7149..d0cbae72 100644 --- a/samples/src/main/java/ingestv2/StreamingIngestV2.java +++ b/samples/src/main/java/ingestv2/StreamingIngestV2.java @@ -107,7 +107,7 @@ static void ingestFromStream() throws Exception { .build(); System.out.println("Ingesting CSV data from string..."); - ExtendedIngestResponse ingestResponse = streamingIngestClient.ingestAsync(database, table, csvStreamSource, csvProperties).get(); + ExtendedIngestResponse ingestResponse = streamingIngestClient.ingestAsyncJava(database, table, csvStreamSource, csvProperties).get(); System.out.println( "CSV ingestion completed. Operation ID: " + ingestResponse.getIngestResponse().getIngestionOperationId()); @@ -123,7 +123,7 @@ static void ingestFromStream() throws Exception { UUID.randomUUID(), false); System.out.println("Ingesting compressed CSV file..."); - ExtendedIngestResponse compressedResponse = streamingIngestClient.ingestAsync(database, table, compressedStreamSource, csvProperties).get(); + ExtendedIngestResponse compressedResponse = streamingIngestClient.ingestAsyncJava(database, table, compressedStreamSource, csvProperties).get(); System.out.println( "Compressed CSV ingestion completed. Operation ID: " + compressedResponse.getIngestResponse().getIngestionOperationId()); @@ -144,7 +144,7 @@ static void ingestFromStream() throws Exception { .build(); System.out.println("Ingesting JSON file with mapping..."); - ExtendedIngestResponse jsonResponse = streamingIngestClient.ingestAsync(database, table, jsonStreamSource, jsonProperties).get(); + ExtendedIngestResponse jsonResponse = streamingIngestClient.ingestAsyncJava(database, table, jsonStreamSource, jsonProperties).get(); System.out.println( "JSON ingestion completed. Operation ID: " + jsonResponse.getIngestResponse().getIngestionOperationId()); @@ -174,7 +174,7 @@ static void ingestFromFile() throws Exception { .build(); System.out.println("Ingesting CSV file..."); - ExtendedIngestResponse csvResponse = streamingIngestClient.ingestAsync(database, table, csvFileSource, csvProperties).get(); + ExtendedIngestResponse csvResponse = streamingIngestClient.ingestAsyncJava(database, table, csvFileSource, csvProperties).get(); System.out.println( "CSV file ingestion completed. Operation ID: " + csvResponse.getIngestResponse().getIngestionOperationId()); @@ -192,7 +192,7 @@ static void ingestFromFile() throws Exception { .build(); System.out.println("Ingesting compressed JSON file with mapping..."); - ExtendedIngestResponse jsonResponse = streamingIngestClient.ingestAsync(database, table, jsonFileSource, jsonProperties).get(); + ExtendedIngestResponse jsonResponse = streamingIngestClient.ingestAsyncJava(database, table, jsonFileSource, jsonProperties).get(); System.out.println( "Compressed JSON file ingestion completed. Operation ID: " + jsonResponse.getIngestResponse().getIngestionOperationId());