Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,33 @@ interface IngestClient : Closeable {
interface MultiIngestClient : IngestClient {

/**
* Ingest data from multiple sources.
* Ingest data from multiple blob sources.
*
* @param sources The sources to ingest.
* **Important:** Multi-blob ingestion only supports [BlobSource]. This
* design avoids partial failure scenarios where some local sources might be
* uploaded successfully while others fail, leaving the user in an
* inconsistent state.
*
* **For local files/streams**, you have two options:
* 1. **Single-source ingestion**: Use `ingestAsync(source, properties)`
* with a single [com.microsoft.azure.kusto.ingest.v2.source.LocalSource]
* (FileSource or StreamSource). The client handles upload internally.
* 2. **Multi-source ingestion**: Use
* [com.microsoft.azure.kusto.ingest.v2.uploader.IUploader] to upload
* local sources to blob storage first, then call this method with the
* resulting [BlobSource] objects.
*
* @param sources The blob sources to ingest. All sources must be
* [BlobSource] instances.
* @param ingestRequestProperties Ingestion properties containing database,
* table, format, and other settings.
* @return An [IngestionOperation] object that can be used to track the
* status of the ingestion.
* @return An [ExtendedIngestResponse] containing the ingestion operation
* details.
*/
suspend fun ingestAsync(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): ExtendedIngestResponse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader
import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.future
import kotlinx.coroutines.withTimeoutOrNull
Expand Down Expand Up @@ -74,13 +71,17 @@ internal constructor(
private val logger = LoggerFactory.getLogger(QueuedIngestClient::class.java)

/**
* Ingests data from multiple sources with the given properties. This is the
* suspend function for Kotlin callers.
* Ingests data from multiple blob sources with the given properties. This
* is the suspend function for Kotlin callers.
*
* Multi-blob ingestion only supports [BlobSource]. The blobs are assumed to
* already exist in blob storage, so no upload is performed - the request is
* sent directly to the Data Management service.
*/
override suspend fun ingestAsync(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): ExtendedIngestResponse =
ingestAsyncInternal(
Expand Down Expand Up @@ -108,14 +109,18 @@ internal constructor(
)

/**
* Ingests data from multiple sources with the given properties. This is the
* Java-friendly version that returns a CompletableFuture.
* Ingests data from multiple blob sources with the given properties. This
* is the Java-friendly version that returns a CompletableFuture.
*
* Multi-blob ingestion only supports [BlobSource]. The blobs are assumed to
* already exist in blob storage, so no upload is performed - the request is
* sent directly to the Data Management service.
*/
@JvmName("ingestAsync")
fun ingestAsyncJava(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): CompletableFuture<ExtendedIngestResponse> =
CoroutineScope(Dispatchers.IO).future {
Expand Down Expand Up @@ -197,14 +202,18 @@ internal constructor(
)
}

/** Internal implementation of ingestAsync for multiple sources. */
/**
* Internal implementation of ingestAsync for multiple blob sources.
*
* This method only accepts [BlobSource] - no upload is performed. The blobs
* are assumed to already exist in blob storage.
*/
private suspend fun ingestAsyncInternal(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): ExtendedIngestResponse {
// Extract database and table from properties
// Validate sources list is not empty
require(sources.isNotEmpty()) { "sources list cannot be empty" }
val maxBlobsPerBatch = getMaxSourcesPerMultiIngest()
Expand All @@ -228,17 +237,14 @@ internal constructor(
differentFormatBlob.joinToString(", "),
)
throw IngestClientException(
"All blobs in the request must have the same format. All blobs in the request must have the same format.Received formats: $differentFormatBlob",
message =
"All blobs in the request must have the same format. Received formats: $differentFormatBlob",
)
}

// Split sources and upload local sources in parallel
val blobSources = uploadLocalSourcesAsync(sources)

// Check for duplicate blob URLs
val duplicates =
blobSources
.groupBy { sanitizeBlobUrl(it.blobPath) }
sources.groupBy { sanitizeBlobUrl(it.blobPath) }
.filter { it.value.size > 1 }

if (duplicates.isNotEmpty()) {
Expand All @@ -251,12 +257,14 @@ internal constructor(
"{Url: $url, Source Ids: [$sourceIds]}"
}
throw IngestClientException(
message =
"Duplicate blob sources detected in the request: [$duplicateInfo]",
)
}

// Create blob objects for the request
val blobs =
blobSources.map {
sources.map {
Blob(
it.blobPath,
sourceId = it.sourceId.toString(),
Expand Down Expand Up @@ -340,6 +348,7 @@ internal constructor(
}
else -> {
throw IngestClientException(
message =
"Unsupported ingestion source type: ${source::class.simpleName}",
)
}
Expand Down Expand Up @@ -382,55 +391,6 @@ internal constructor(
}
}

/**
* Splits sources into BlobSources and LocalSources, uploads LocalSources in
* parallel, and returns a unified list of BlobSources.
*
* @param sources The list of ingestion sources to process
* @return A list of BlobSources including both original BlobSources and
* uploaded LocalSources
* @throws IngestClientException if an unsupported source type is
* encountered
*/
private suspend fun uploadLocalSourcesAsync(
sources: List<IngestionSource>,
): List<BlobSource> {
// Split sources into BlobSources and LocalSources
val blobSources = mutableListOf<BlobSource>()
val localSources = mutableListOf<LocalSource>()

sources.forEach { source ->
when (source) {
is BlobSource -> blobSources.add(source)
is LocalSource -> localSources.add(source)
else ->
throw IngestClientException(
"Unsupported ingestion source type: ${source::class.simpleName}",
)
}
}

// Upload LocalSources in parallel and collect the resulting BlobSources
if (localSources.isNotEmpty()) {
logger.info(
"Uploading ${localSources.size} local source(s) to blob storage",
)
val uploadedBlobs = coroutineScope {
localSources
.map { localSource ->
async { uploader.uploadAsync(localSource) }
}
.awaitAll()
}
blobSources.addAll(uploadedBlobs)
logger.info(
"Successfully uploaded ${uploadedBlobs.size} local source(s)",
)
}

return blobSources
}

/**
* Sanitizes a blob URL by removing the SAS token and query parameters to
* allow proper duplicate detection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,59 @@ class DefaultConfigurationCache(
val clientDetails: ClientDetails,
val configurationProvider: (suspend () -> ConfigurationResponse)? = null,
) : ConfigurationCache {
companion object {
/**
* Creates a DefaultConfigurationCache for Java callers.
*
* This factory method provides a convenient way to create a cache from
* Java without dealing with Kotlin named parameters.
*
* @param dmUrl Data management endpoint URL
* @param tokenCredential Authentication credentials
* @param clientDetails Client identification details for tracking
* @return A new DefaultConfigurationCache instance
*/
@JvmStatic
fun create(
dmUrl: String,
tokenCredential: TokenCredential,
clientDetails: ClientDetails,
): DefaultConfigurationCache =
DefaultConfigurationCache(
dmUrl = dmUrl,
tokenCredential = tokenCredential,
clientDetails = clientDetails,
)

/**
* Creates a DefaultConfigurationCache with all options for Java
* callers.
*
* @param dmUrl Data management endpoint URL
* @param tokenCredential Authentication credentials
* @param skipSecurityChecks Whether to skip security validation
* @param clientDetails Client identification details for tracking
* @param refreshInterval Duration after which cached configuration is
* stale
* @return A new DefaultConfigurationCache instance
*/
@JvmStatic
fun create(
dmUrl: String,
tokenCredential: TokenCredential,
skipSecurityChecks: Boolean,
clientDetails: ClientDetails,
refreshInterval: Duration,
): DefaultConfigurationCache =
DefaultConfigurationCache(
refreshInterval = refreshInterval,
dmUrl = dmUrl,
tokenCredential = tokenCredential,
skipSecurityChecks = skipSecurityChecks,
clientDetails = clientDetails,
)
}

init {
if (
configurationProvider == null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import java.nio.file.Path
import java.util.UUID

/** Represents a file-based ingestion source. */
class FileSource(
class FileSource
@JvmOverloads
constructor(
val path: Path,
format: Format,
sourceId: UUID = UUID.randomUUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import java.io.InputStream
import java.util.UUID

/** Represents a stream-based ingestion source. */
class StreamSource(
class StreamSource
@JvmOverloads
constructor(
stream: InputStream,
format: Format,
sourceCompression: CompressionType,
sourceCompression: CompressionType = CompressionType.NONE,
sourceId: UUID = UUID.randomUUID(),
leaveOpen: Boolean = false,
) : LocalSource(format, leaveOpen, sourceCompression, sourceId) {
Expand Down
Loading
Loading