Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2

import java.net.URI

/**
* Base utility object containing common functionality for ingest clients.
*
* This implementation matches the Java IngestClientBase behavior for handling
* special URLs like localhost, IP addresses, and reserved hostnames.
*/
object IngestClientBase {
private const val INGEST_PREFIX = "ingest-"
private const val PROTOCOL_SUFFIX = "://"

/**
* Converts a cluster URL to an ingestion endpoint URL by adding the
* "ingest-" prefix.
*
* Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net) are
* returned unchanged to support local development and testing scenarios.
*
* @param clusterUrl The cluster URL to convert
* @return The ingestion endpoint URL with "ingest-" prefix, or the original
* URL for special cases
*/
@JvmStatic
fun getIngestionEndpoint(clusterUrl: String?): String? {
if (clusterUrl == null ||
clusterUrl.contains(INGEST_PREFIX) ||
isReservedHostname(clusterUrl)
) {
return clusterUrl
}
return if (clusterUrl.contains(PROTOCOL_SUFFIX)) {
clusterUrl.replaceFirst(
PROTOCOL_SUFFIX,
"$PROTOCOL_SUFFIX$INGEST_PREFIX",
)
} else {
INGEST_PREFIX + clusterUrl
}
}

/**
* Converts an ingestion endpoint URL to a query endpoint URL by removing
* the "ingest-" prefix.
*
* Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net) are
* returned unchanged.
*
* @param clusterUrl The ingestion endpoint URL to convert
* @return The query endpoint URL without "ingest-" prefix, or the original
* URL for special cases
*/
@JvmStatic
fun getQueryEndpoint(clusterUrl: String?): String? {
return if (clusterUrl == null || isReservedHostname(clusterUrl)) {
clusterUrl
} else {
clusterUrl.replaceFirst(INGEST_PREFIX, "")
}
}

/**
* Checks if the given URL points to a reserved hostname that should not
* have the "ingest-" prefix added.
*
* Reserved hostnames include:
* - localhost
* - IPv4 addresses (e.g., 127.0.0.1, 192.168.1.1)
* - IPv6 addresses (e.g., [2345:0425:2CA1:0000:0000:0567:5673:23b5])
* - onebox.dev.kusto.windows.net (development environment)
* - Non-absolute URIs
*
* @param rawUri The URL to check
* @return true if the URL is a reserved hostname, false otherwise
*/
@JvmStatic
fun isReservedHostname(rawUri: String): Boolean {
val uri =
try {
URI.create(rawUri)
} catch (_: IllegalArgumentException) {
return true
}

if (!uri.isAbsolute) {
return true
}

val authority = uri.authority?.lowercase() ?: return true

// Check for IPv6 address (wrapped in brackets)
val isIpAddress =
if (authority.startsWith("[") && authority.endsWith("]")) {
true
} else {
isIPv4Address(authority)
}

val isLocalhost = authority.contains("localhost")
val host = uri.host?.lowercase() ?: ""

return isLocalhost ||
isIpAddress ||
host.equals("onebox.dev.kusto.windows.net", ignoreCase = true)
}

/**
* Checks if the given string is a valid IPv4 address.
*
* This method validates that the string consists of exactly 4 octets, each
* being a number between 0 and 255.
*
* @param address The string to check (may include port like "127.0.0.1:8080")
* @return true if the string is a valid IPv4 address, false otherwise
*/
private fun isIPv4Address(address: String): Boolean {
// Remove port if present (e.g., "127.0.0.1:8080" -> "127.0.0.1")
val hostPart =
if (address.contains(":")) {
address.substringBeforeLast(":")
} else {
address
}

val parts = hostPart.split(".")
if (parts.size != 4) {
return false
}

return parts.all { part ->
val num = part.toIntOrNull()
num != null && num in 0..255
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ open class KustoBaseApiClient(
}

val engineUrl: String
get() = dmUrl.replace(Regex("https://ingest-"), "https://")
get() = IngestClientBase.getQueryEndpoint(dmUrl) ?: dmUrl

val api: DefaultApi by lazy {
DefaultApi(baseUrl = dmUrl, httpClientConfig = setupConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.microsoft.azure.kusto.ingest.v2.builders

import com.azure.core.credential.TokenCredential
import com.microsoft.azure.kusto.ingest.v2.IngestClientBase
import com.microsoft.azure.kusto.ingest.v2.KustoBaseApiClient
import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_CONCURRENCY
import com.microsoft.azure.kusto.ingest.v2.UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES
Expand All @@ -21,11 +22,6 @@ abstract class BaseIngestClientBuilder<T : BaseIngestClientBuilder<T>> {
protected var s2sTokenProvider: (suspend () -> S2SToken)? = null
protected var s2sFabricPrivateLinkAccessContext: String? = null

// Added properties for ingestion endpoint and authentication
protected var ingestionEndpoint: String? = null
protected var clusterEndpoint: String? = null
protected var authentication: TokenCredential? = null

protected var maxConcurrency: Int = UPLOAD_CONTAINER_MAX_CONCURRENCY
protected var maxDataSize: Long = UPLOAD_CONTAINER_MAX_DATA_SIZE_BYTES
protected var ignoreFileSize: Boolean = false
Expand All @@ -37,7 +33,6 @@ abstract class BaseIngestClientBuilder<T : BaseIngestClientBuilder<T>> {

fun withAuthentication(credential: TokenCredential): T {
this.tokenCredential = credential
this.authentication = credential // Set authentication
return self()
}

Expand Down Expand Up @@ -169,39 +164,34 @@ abstract class BaseIngestClientBuilder<T : BaseIngestClientBuilder<T>> {
.build()
}

protected fun setEndpoint(endpoint: String) {
this.ingestionEndpoint = normalizeAndCheckDmUrl(endpoint)
this.clusterEndpoint = normalizeAndCheckEngineUrl(endpoint)
}

companion object {
/**
* Converts an ingestion endpoint URL to a query/engine endpoint URL by
* removing the "ingest-" prefix.
*
* Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net)
* are returned unchanged.
*
* @param clusterUrl The ingestion endpoint URL to convert
* @return The query endpoint URL without "ingest-" prefix
*/
protected fun normalizeAndCheckEngineUrl(clusterUrl: String): String {
val normalizedUrl =
if (clusterUrl.matches(Regex("https://ingest-[^/]+.*"))) {
// If the URL starts with https://ingest-, remove ingest-
clusterUrl.replace(
Regex("https://ingest-([^/]+)"),
"https://$1",
)
} else {
clusterUrl
}
return normalizedUrl
return IngestClientBase.getQueryEndpoint(clusterUrl) ?: clusterUrl
}

/**
* Converts a cluster URL to an ingestion endpoint URL by adding the
* "ingest-" prefix.
*
* Special URLs (localhost, IP addresses, onebox.dev.kusto.windows.net)
* are returned unchanged to support local development and testing.
*
* @param dmUrl The cluster URL to convert
* @return The ingestion endpoint URL with "ingest-" prefix
*/
@JvmStatic
protected fun normalizeAndCheckDmUrl(dmUrl: String): String {
val normalizedUrl =
if (dmUrl.matches(Regex("https://(?!ingest-)[^/]+.*"))) {
// If the URL starts with https:// and does not already have ingest-, add it
dmUrl.replace(
Regex("https://([^/]+)"),
"https://ingest-$1",
)
} else {
dmUrl
}
return normalizedUrl
return IngestClientBase.getIngestionEndpoint(dmUrl) ?: dmUrl
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class QueuedIngestClientBuilder private constructor(private val dmUrl: String) :
}

fun build(): QueuedIngestClient {
setEndpoint(dmUrl)
requireNotNull(tokenCredential) {
"Authentication is required. Call withAuthentication() before build()"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ private constructor(private val clusterUrl: String) :
}

fun build(): StreamingIngestClient {
setEndpoint(clusterUrl)
requireNotNull(tokenCredential) {
"Authentication is required. Call withAuthentication() before build()"
}
validateParameters()
val effectiveClientDetails =
clientDetails ?: ClientDetails.createDefault()
val apiClient =
Expand All @@ -39,11 +37,6 @@ private constructor(private val clusterUrl: String) :
)
return StreamingIngestClient(
apiClient = apiClient,
) // Assuming these are set in BaseIngestClientBuilder
}

private fun validateParameters() {
requireNotNull(ingestionEndpoint) { "Ingestion endpoint must be set." }
requireNotNull(authentication) { "Authentication must be set." }
)
}
}
Loading
Loading