Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
ac988ff
* Add boilerplate
ag-ramachandran Sep 2, 2025
40ffe44
* Add boilerplate and code generator
ag-ramachandran Sep 2, 2025
7260722
* Add boilerplate and code generator
ag-ramachandran Sep 2, 2025
9762b20
* Edits to code
ag-ramachandran Sep 3, 2025
41baae2
* Edits to code
ag-ramachandran Sep 3, 2025
73af48b
* Move code forward
ag-ramachandran Sep 8, 2025
5d6d331
* Move code forward
ag-ramachandran Sep 8, 2025
6e3f21a
*Additional edits
ag-ramachandran Sep 9, 2025
68c1714
*Additional edits
ag-ramachandran Sep 9, 2025
18afb7b
*Additional edits
ag-ramachandran Sep 9, 2025
3daa586
*Add tests
ag-ramachandran Sep 9, 2025
828e5b9
*Add changes to content negotiation
ag-ramachandran Sep 9, 2025
d2fac0b
*Update tests and POM
ag-ramachandran Sep 10, 2025
8710f02
*Minor edits
ag-ramachandran Sep 10, 2025
a7403f7
*Reformat code
ag-ramachandran Sep 10, 2025
57a151e
* Remove gitignore
ag-ramachandran Sep 18, 2025
9ccfd3d
*Address some of the review comments
ag-ramachandran Sep 18, 2025
c472f9c
* Fix some more review comments
ag-ramachandran Sep 26, 2025
6b74161
* Fix comments and push this as the base branch for IngestV2
ag-ramachandran Oct 16, 2025
7686bef
* Reformat code changes
ag-ramachandran Oct 16, 2025
0d12bab
* Rename retry data class
ag-ramachandran Oct 16, 2025
5c9d007
* Rename retry data class
ag-ramachandran Oct 16, 2025
2b74d1c
* Remove unused classes
ag-ramachandran Oct 30, 2025
3eab813
* Remove unused classes
ag-ramachandran Oct 30, 2025
86d3f79
* Fix comment on substring chaining
ag-ramachandran Oct 31, 2025
0022577
Feature/add ingestion source blob (#440)
tanmaya-panda1 Dec 2, 2025
0bb1281
Feature/add local file source v2 (#443)
tanmaya-panda1 Dec 2, 2025
ba3f817
disabled serialized execution
tanmaya-panda1 Dec 3, 2025
ab64a1b
optimized QueuedIngestionClientTests execution
tanmaya-panda1 Dec 4, 2025
8b63150
Feature/add uploader (#446)
ag-ramachandran Dec 17, 2025
8cabbc5
* Refactor to add GZIP Compression (#449)
ag-ramachandran Dec 29, 2025
4a17c49
* Remove test for missing blob
ag-ramachandran Dec 29, 2025
307ff6a
* Rebase POM changes
ag-ramachandran Dec 29, 2025
e85af02
* Rebase POM changes
ag-ramachandran Dec 29, 2025
1d0a6fa
* Rebase POM changes
ag-ramachandran Dec 29, 2025
3a31d4d
* Add JACOCO for coverage
ag-ramachandran Dec 30, 2025
4931ac0
Feature/fix method signature orders (#452)
ag-ramachandran Dec 31, 2025
bca955e
* Rebase changes with master
ag-ramachandran Jan 8, 2026
d6a66f7
Feature/add junits (#451)
tanmaya-panda1 Jan 8, 2026
4ebf781
* Remove unused test assertion
ag-ramachandran Jan 8, 2026
d3428ec
* Remove JVMOverloads and add log supression for bytebuddy in MOCKK a…
ag-ramachandran Jan 8, 2026
bdce366
* Fix lint in tests
ag-ramachandran Jan 8, 2026
b9ccf14
* Minor changes to Streaming ingest tests (#454)
ag-ramachandran Jan 9, 2026
ce36f64
Users/ramacg/fix review comments (#457)
ag-ramachandran Jan 15, 2026
8a41876
added code for wellknown kusto endpoints (#456)
tanmaya-panda1 Jan 20, 2026
66de6e7
Users/tanmayapanda/address review comments (#458)
tanmaya-panda1 Jan 20, 2026
4c3d8f6
addressed review comments (#460)
tanmaya-panda1 Jan 20, 2026
5f296db
Users/ramacg/refactor managed uploader (#455)
ag-ramachandran Jan 21, 2026
b1ec311
* Fix JDK version in POM (#461)
ag-ramachandran Jan 21, 2026
f61924c
added private link and removed tuple/Pair (#462)
tanmaya-panda1 Jan 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
38 changes: 38 additions & 0 deletions ingest-v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# ingest-v2

This project was created using the [Ktor Project Generator](https://start.ktor.io).

Here are some useful links to get you started:

- [Ktor Documentation](https://ktor.io/docs/home.html)
- [Ktor GitHub page](https://github.com/ktorio/ktor)
- The [Ktor Slack chat](https://app.slack.com/client/T09229ZC6/C0A974TJ9). You'll need to [request an invite](https://surveys.jetbrains.com/s3/kotlin-slack-sign-up) to join.

## Features

Here's a list of features included in this project:

| Name | Description |
|------------------------------------------------------------------------|------------------------------------------------------------------------------------|
| [Content Negotiation](https://start.ktor.io/p/content-negotiation) | Provides automatic content conversion according to Content-Type and Accept headers |
| [Routing](https://start.ktor.io/p/routing) | Provides a structured routing DSL |
| [kotlinx.serialization](https://start.ktor.io/p/kotlinx-serialization) | Handles JSON serialization using kotlinx.serialization library |
| [AsyncAPI](https://start.ktor.io/p/asyncapi) | Generates and serves AsyncAPI documentation |

## Building & Running

To build or run the project, use one of the following tasks:

| Task | Description |
|--------------------------------------------------------------|-------------------|
| `mvn test` | Run the tests |
| `mvn package` | Build the project |
| `java -jar target/ingest-v2-0.0.1-jar-with-dependencies.jar` | Run the server |

If the server starts successfully, you'll see the following output:

```
2024-12-04 14:32:45.584 [main] INFO Application - Application started in 0.303 seconds.
2024-12-04 14:32:45.682 [main] INFO Application - Responding at http://0.0.0.0:8080
```

416 changes: 416 additions & 0 deletions ingest-v2/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2

import com.azure.core.credential.TokenCredential
import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.models.S2SToken
import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse
import io.ktor.http.HttpStatusCode
import org.slf4j.LoggerFactory
import java.net.ConnectException

class ConfigurationClient(
override val dmUrl: String,
override val tokenCredential: TokenCredential,
override val skipSecurityChecks: Boolean = false,
override val clientDetails: ClientDetails,
override val s2sTokenProvider: (suspend () -> S2SToken)? = null,
override val s2sFabricPrivateLinkAccessContext: String? = null,
) :
KustoBaseApiClient(
dmUrl,
tokenCredential,
skipSecurityChecks,
clientDetails,
s2sTokenProvider = s2sTokenProvider,
s2sFabricPrivateLinkAccessContext = s2sFabricPrivateLinkAccessContext,
) {
private val logger =
LoggerFactory.getLogger(ConfigurationClient::class.java)
private val baseUrl = "$dmUrl/v1/rest/ingestion/configuration"

suspend fun getConfigurationDetails(): ConfigurationResponse {
try {
val configurationHttpResponse: HttpResponse<ConfigurationResponse> =
api.getIngestConfiguration()
if (configurationHttpResponse.success) {
logger.info(
"Successfully retrieved configuration details from $dmUrl with status: ${configurationHttpResponse.status}",
)
logger.debug(
"Configuration details: {}",
configurationHttpResponse.body(),
)
return configurationHttpResponse.body()
} else if (
configurationHttpResponse.status ==
HttpStatusCode.NotFound.value
) {
/*
404 is a special case - it indicates that the endpoint is not found. This may be a transient
network issue
*/
val message =
"Endpoint $dmUrl not found. Please ensure the cluster supports queued ingestion."
logger.error(
"{}. Status: {}",
message,
configurationHttpResponse.status,
)
throw IngestException(
message = message,
cause = ConnectException(message),
failureCode = configurationHttpResponse.status,
failureSubCode = "",
isPermanent = false,
)
} else {
val configurationResponseBody = configurationHttpResponse.body()
val message =
"Failed to retrieve configuration details from $baseUrl.Status: ${configurationHttpResponse.status}, " +
"Body: $configurationResponseBody"
logger.error("{}", message)
throw IngestException(
message = message,
failureCode = configurationHttpResponse.status,
)
}
} catch (notAbleToReachHost: ConnectException) {
val message =
"Failed to reach $baseUrl. Please ensure the cluster address is correct and the cluster is reachable."
throw IngestException(
message = message,
cause = notAbleToReachHost,
failureCode = HttpStatusCode.NotFound.value,
failureSubCode = "",
isPermanent = false,
)
} catch (ex: Exception) {
if (ex is IngestException) throw ex
val message =
"An unexpected error occurred while trying to reach $baseUrl"
throw IngestException(
message = message,
cause = ex,
// Mark this as a 5xx series error
failureCode = HttpStatusCode.InternalServerError.value,
failureSubCode = "",
isPermanent = true,
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2

// Size of each block to upload to Azure Blob Storage (4 MB)
const val UPLOAD_BLOCK_SIZE_BYTES: Long = 4 * 1024 * 1024

// Maximum size for a single upload operation to Azure Blob Storage (256 MB)
const val UPLOAD_MAX_SINGLE_SIZE_BYTES: Long = 256 * 1024 * 1024

// Request timeout in milliseconds for Kusto API HTTP requests
const val KUSTO_API_REQUEST_TIMEOUT_MS: Long = 60_000

// Connection timeout in milliseconds for Kusto API HTTP requests
const val KUSTO_API_CONNECT_TIMEOUT_MS: Long = 60_000

// Socket timeout in milliseconds for Kusto API HTTP requests
const val KUSTO_API_SOCKET_TIMEOUT_MS: Long = 60_000

// Kusto API version used in HTTP requests
const val KUSTO_API_VERSION = "2024-12-12"

// Default refresh interval for configuration cache (1 hour)
const val CONFIG_CACHE_DEFAULT_REFRESH_INTERVAL_HOURS: Long = 1

// Default value for skipSecurityChecks if not provided
const val CONFIG_CACHE_DEFAULT_SKIP_SECURITY_CHECKS: Boolean = false

// Default interval between retries for SimpleRetryPolicy (10 seconds)
const val INGEST_RETRY_POLICY_DEFAULT_INTERVAL_SECONDS: Long = 10

// Default total number of retries for SimpleRetryPolicy
const val INGEST_RETRY_POLICY_DEFAULT_TOTAL_RETRIES: Int = 3

// Default timeout for blob upload operations (1 hour)
const val BLOB_UPLOAD_TIMEOUT_HOURS: Long = 1

// Default retry intervals for CustomRetryPolicy (1s, 3s, 7s)
val INGEST_RETRY_POLICY_CUSTOM_INTERVALS: Array<Long> = arrayOf(1, 3, 7)

// Number of blobs to upload in a single batch
const val MAX_BLOBS_PER_BATCH: Int = 70

// Default maximum data size for blob upload operations (4GB)
const val UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES: Long = 4L * 1024 * 1024 * 1024

// Default maximum concurrency for blob upload operations
const val UPLOAD_CONTAINER_MAX_CONCURRENCY: Int = 4

const val STREAMING_MAX_REQ_BODY_SIZE = 10 * 1024 * 1024 // 10 MB

// Managed Streaming Policy Defaults

// Default value for continueWhenStreamingIngestionUnavailable in ManagedStreamingPolicy
// When false, the client will fail if streaming ingestion is unavailable
const val MANAGED_STREAMING_CONTINUE_WHEN_UNAVAILABLE_DEFAULT: Boolean = false

// Default data size factor for ManagedStreamingPolicy
// Factor used to determine size threshold for queued ingestion (1.0 = no adjustment)
const val MANAGED_STREAMING_DATA_SIZE_FACTOR_DEFAULT: Double = 1.0

// Default throttle backoff period in seconds for ManagedStreamingPolicy
// How long to use queued ingestion after streaming is throttled
const val MANAGED_STREAMING_THROTTLE_BACKOFF_SECONDS: Long = 10

// Default time until resuming streaming ingestion in minutes for ManagedStreamingPolicy
// How long to use queued ingestion after streaming becomes unavailable
const val MANAGED_STREAMING_RESUME_TIME_MINUTES: Long = 15

// Default retry delays for ManagedStreamingPolicy (in seconds): 1s, 2s, 4s
val MANAGED_STREAMING_RETRY_DELAYS_SECONDS: Array<Long> = arrayOf(1, 2, 4)

// Maximum jitter to add to retry delays in milliseconds
const val MANAGED_STREAMING_RETRY_JITTER_MS: Long = 1000

const val STREAM_COMPRESSION_BUFFER_SIZE_BYTES: Int = 64 * 1024
const val STREAM_PIPE_BUFFER_SIZE_BYTES: Int = 1024 * 1024
Loading
Loading