From 348a5c59d0181028505072079d18d382b8f6510e Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Thu, 19 Feb 2026 18:06:38 +0530 Subject: [PATCH 1/2] added CachingTokenCredential for concurrent azcli token handling --- .../ingest/v2/CachingTokenCredential.java | 79 +++++++++++++++++++ .../kusto/ingest/v2/IngestV2JavaTestBase.java | 4 +- .../azure/kusto/ingest/v2/IngestV2TestBase.kt | 10 ++- 3 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java new file mode 100644 index 00000000..ea388910 --- /dev/null +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2; + +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import com.azure.identity.AzureCliCredentialBuilder; +import reactor.core.publisher.Mono; + +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +/** + * A test-only {@link TokenCredential} wrapper that caches tokens per scope to prevent concurrent + * subprocess invocations when the underlying credential (e.g. {@code AzureCliCredential}) + * is called from multiple parallel test threads simultaneously. + * + *

The token is proactively refreshed {@value #TOKEN_REFRESH_MARGIN_MINUTES} minutes before + * expiry. All concurrent callers for the same scope wait on the same {@link ReentrantLock} — + * only one subprocess invocation occurs at a time (double-checked locking). + * + *

Token caching is intentionally placed here (test layer) rather than in + * {@code KustoBaseApiClient}, keeping token lifecycle management as the responsibility of the + * {@link TokenCredential} implementor. + */ +public class CachingTokenCredential implements TokenCredential { + + /** Process-wide shared instance wrapping {@code AzureCliCredential}, used by all E2E tests. */ + public static final TokenCredential INSTANCE = + new CachingTokenCredential(new AzureCliCredentialBuilder().build()); + + private static final long TOKEN_REFRESH_MARGIN_MINUTES = 5L; + + private final TokenCredential delegate; + private final ReentrantLock lock = new ReentrantLock(); + // Per-scope cache: scope string → AccessToken + private final Map tokenCache = new ConcurrentHashMap<>(); + + public CachingTokenCredential(TokenCredential delegate) { + this.delegate = delegate; + } + + @Override + public Mono getToken(TokenRequestContext request) { + String scopeKey = request.getScopes().stream().collect(Collectors.joining(",")); + + // Fast path: return cached token without acquiring lock if not near expiry + AccessToken current = tokenCache.get(scopeKey); + if (current != null && current.getExpiresAt().isAfter( + OffsetDateTime.now().plusMinutes(TOKEN_REFRESH_MARGIN_MINUTES))) { + return Mono.just(current); + } + + // Slow path: acquire lock, refresh if still needed (double-checked locking) + return Mono.fromCallable(() -> { + lock.lock(); + try { + AccessToken recheck = tokenCache.get(scopeKey); + if (recheck != null && recheck.getExpiresAt().isAfter( + OffsetDateTime.now().plusMinutes(TOKEN_REFRESH_MARGIN_MINUTES))) { + return recheck; + } + // Only one thread reaches here per scope at a time + AccessToken newToken = delegate.getToken(request).block(); + if (newToken == null) { + throw new IllegalStateException("TokenCredential.getToken() returned null"); + } + tokenCache.put(scopeKey, newToken); + return newToken; + } finally { + lock.unlock(); + } + }); + } +} \ No newline at end of file diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java index ad536b6e..ddadd8c3 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java @@ -4,7 +4,6 @@ package com.microsoft.azure.kusto.ingest.v2; import com.azure.core.credential.TokenCredential; -import com.azure.identity.AzureCliCredentialBuilder; import com.microsoft.azure.kusto.data.Client; import com.microsoft.azure.kusto.data.ClientFactory; import com.microsoft.azure.kusto.data.KustoResultSetTable; @@ -40,7 +39,8 @@ public abstract class IngestV2JavaTestBase { public IngestV2JavaTestBase(Class testClass) { this.logger = LoggerFactory.getLogger(testClass); - this.tokenProvider = new AzureCliCredentialBuilder().build(); + // Reuse the shared CachingTokenCredential singleton to prevent multiple concurrent requests to fetch token from azcli + this.tokenProvider = CachingTokenCredential.INSTANCE; // Get configuration from environment variables this.database = System.getenv("TEST_DATABASE") != null diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt index a0e64ae9..dcc2d44e 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt @@ -3,7 +3,6 @@ package com.microsoft.azure.kusto.ingest.v2 import com.azure.core.credential.TokenCredential -import com.azure.identity.AzureCliCredentialBuilder import com.microsoft.azure.kusto.data.Client import com.microsoft.azure.kusto.data.ClientFactory import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder @@ -21,8 +20,11 @@ import kotlin.test.assertTrue abstract class IngestV2TestBase(testClass: Class<*>) { protected val logger: Logger = LoggerFactory.getLogger(testClass) - protected val tokenProvider: TokenCredential = - AzureCliCredentialBuilder().build() + + // Shared across all test class instances via CachingTokenCredential singleton. + // Ensures az account get-access-token is invoked only once per scope. + protected val tokenProvider: TokenCredential = CachingTokenCredential.INSTANCE + protected val database = System.getenv("TEST_DATABASE") ?: "e2e" protected val dmEndpoint: String = System.getenv("DM_CONNECTION_STRING") @@ -157,4 +159,4 @@ abstract class IngestV2TestBase(testClass: Class<*>) { ) } } -} +} \ No newline at end of file From 9a5f82bd57a0bc742bb981f6b3a3e08af2649e52 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Thu, 19 Feb 2026 23:56:59 +0530 Subject: [PATCH 2/2] simplified token generation and access --- .../ingest/v2/CachingTokenCredential.java | 68 +++---------------- .../kusto/ingest/v2/IngestV2JavaTestBase.java | 5 +- .../azure/kusto/ingest/v2/IngestV2TestBase.kt | 3 +- 3 files changed, 15 insertions(+), 61 deletions(-) diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java index ea388910..c64e0d8a 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/CachingTokenCredential.java @@ -8,72 +8,24 @@ import com.azure.identity.AzureCliCredentialBuilder; import reactor.core.publisher.Mono; -import java.time.OffsetDateTime; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - /** - * A test-only {@link TokenCredential} wrapper that caches tokens per scope to prevent concurrent - * subprocess invocations when the underlying credential (e.g. {@code AzureCliCredential}) - * is called from multiple parallel test threads simultaneously. - * - *

The token is proactively refreshed {@value #TOKEN_REFRESH_MARGIN_MINUTES} minutes before - * expiry. All concurrent callers for the same scope wait on the same {@link ReentrantLock} — - * only one subprocess invocation occurs at a time (double-checked locking). - * - *

Token caching is intentionally placed here (test layer) rather than in - * {@code KustoBaseApiClient}, keeping token lifecycle management as the responsibility of the - * {@link TokenCredential} implementor. + * Test-only credential: calls {@code az} once at class load, reuses the token forever. */ public class CachingTokenCredential implements TokenCredential { - /** Process-wide shared instance wrapping {@code AzureCliCredential}, used by all E2E tests. */ - public static final TokenCredential INSTANCE = - new CachingTokenCredential(new AzureCliCredentialBuilder().build()); - - private static final long TOKEN_REFRESH_MARGIN_MINUTES = 5L; - - private final TokenCredential delegate; - private final ReentrantLock lock = new ReentrantLock(); - // Per-scope cache: scope string → AccessToken - private final Map tokenCache = new ConcurrentHashMap<>(); + private static final AccessToken TOKEN; - public CachingTokenCredential(TokenCredential delegate) { - this.delegate = delegate; + static { + // Runs once on main thread before any reactor threads — .block() is safe here. + TOKEN = new AzureCliCredentialBuilder().build() + .getToken(new TokenRequestContext().addScopes("https://kusto.kusto.windows.net/.default")) + .block(); } + public static final TokenCredential INSTANCE = new CachingTokenCredential(); + @Override public Mono getToken(TokenRequestContext request) { - String scopeKey = request.getScopes().stream().collect(Collectors.joining(",")); - - // Fast path: return cached token without acquiring lock if not near expiry - AccessToken current = tokenCache.get(scopeKey); - if (current != null && current.getExpiresAt().isAfter( - OffsetDateTime.now().plusMinutes(TOKEN_REFRESH_MARGIN_MINUTES))) { - return Mono.just(current); - } - - // Slow path: acquire lock, refresh if still needed (double-checked locking) - return Mono.fromCallable(() -> { - lock.lock(); - try { - AccessToken recheck = tokenCache.get(scopeKey); - if (recheck != null && recheck.getExpiresAt().isAfter( - OffsetDateTime.now().plusMinutes(TOKEN_REFRESH_MARGIN_MINUTES))) { - return recheck; - } - // Only one thread reaches here per scope at a time - AccessToken newToken = delegate.getToken(request).block(); - if (newToken == null) { - throw new IllegalStateException("TokenCredential.getToken() returned null"); - } - tokenCache.put(scopeKey, newToken); - return newToken; - } finally { - lock.unlock(); - } - }); + return Mono.just(TOKEN); } } \ No newline at end of file diff --git a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java index ddadd8c3..8227f4b7 100644 --- a/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java +++ b/ingest-v2/src/test/java/com/microsoft/azure/kusto/ingest/v2/IngestV2JavaTestBase.java @@ -116,9 +116,10 @@ public void createTables() throws Exception { } mappingBuilder.append("\n]```"); - // Create admin client + // Create admin client using the shared CachingTokenCredential to prevent + // concurrent az subprocess invocations from parallel test threads adminClusterClient = ClientFactory.createClient( - ConnectionStringBuilder.createWithAzureCli(engineEndpoint) + ConnectionStringBuilder.createWithTokenCredential(engineEndpoint, tokenProvider) ); // Execute table creation and mapping diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt index dcc2d44e..d2ffcd78 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/IngestV2TestBase.kt @@ -77,8 +77,9 @@ abstract class IngestV2TestBase(testClass: Class<*>) { .trimIndent() adminClusterClient = ClientFactory.createClient( - ConnectionStringBuilder.createWithAzureCli( + ConnectionStringBuilder.createWithTokenCredential( engineEndpoint, + tokenProvider, ), ) adminClusterClient.executeMgmt(database, createTableScript)