Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7173494
Fixes UnsupportedOperationException when using readManyByPartitionKey…
FabianMeiswinkel May 29, 2026
a442db3
Updated changelogs
FabianMeiswinkel May 29, 2026
3ba04b0
Merge branch 'main' into users/fabianm/ReadManyByPKFix
FabianMeiswinkel May 29, 2026
364d931
Update spark.yml JarStorageAccountName for ephemeral tenant rotation …
xinlian12 May 29, 2026
af2b89d
Merge branch 'upstream-main' into users/fabianm/ReadManyByPKFix
xinlian12 May 29, 2026
671d37d
Keeping FeedResponse.header final
FabianMeiswinkel May 31, 2026
ef4ac11
Merge branch 'users/fabianm/ReadManyByPKFix' of https://github.com/Fa…
FabianMeiswinkel May 31, 2026
1766d57
Merge branch 'main' into users/fabianm/ReadManyByPKFix
FabianMeiswinkel May 31, 2026
03be254
Unifying 449 to be retried client-side across Gateway modes
FabianMeiswinkel Jun 1, 2026
4169fbc
Honoring remaining timeout
FabianMeiswinkel Jun 1, 2026
352d523
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jun 1, 2026
4487103
Update CHANGELOG.md
FabianMeiswinkel Jun 1, 2026
3cdbaa7
Potential fix for pull request finding
FabianMeiswinkel Jun 1, 2026
2542491
Update GatewayRetryWithRetryPolicy.java
FabianMeiswinkel Jun 1, 2026
ff30311
Fixing test issue
FabianMeiswinkel Jun 1, 2026
625b49b
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 1, 2026
9c1e925
Update GatewayRetryWithRetryPolicyTest.java
FabianMeiswinkel Jun 1, 2026
d623688
Adding additional tests
FabianMeiswinkel Jun 1, 2026
3f16f27
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 1, 2026
e303df5
Fixing test issues
FabianMeiswinkel Jun 1, 2026
d061e7a
Merge branch 'users/fabianm/449Retry' of https://github.com/FabianMei…
FabianMeiswinkel Jun 1, 2026
31485bb
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 1, 2026
1c47236
Fixing tests
FabianMeiswinkel Jun 2, 2026
f4769f6
Update GatewayRetryWithRetryPolicyTest.java
FabianMeiswinkel Jun 2, 2026
2401d4c
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 2, 2026
6abe681
Merge branch 'users/fabianm/449Retry' of https://github.com/FabianMei…
FabianMeiswinkel Jun 2, 2026
d2786aa
Fixes Spring Boot AAD tests
FabianMeiswinkel Jun 2, 2026
ed1b60c
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 2, 2026
5b99e51
Update RecordingClientHttpRequestFactoryBuilderConfiguration.java
FabianMeiswinkel Jun 2, 2026
512c9ab
Merge branch 'users/fabianm/449Retry' of https://github.com/FabianMei…
FabianMeiswinkel Jun 2, 2026
fdfa730
Fixing tests issues
FabianMeiswinkel Jun 2, 2026
070a5bf
Update QueryValidationTests.java
FabianMeiswinkel Jun 2, 2026
bc11211
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 2, 2026
ccabf82
Fixing flaky query tests due to container creation being async
FabianMeiswinkel Jun 2, 2026
56fda90
Merge branch 'main' into users/fabianm/449Retry
FabianMeiswinkel Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.faultinjection;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
Expand Down Expand Up @@ -123,7 +124,7 @@ public static Object[][] faultInjectionServerErrorResponseProvider() {
return new Object[][]{
// faultInjectionServerError, will SDK retry, check for multi-region setup, errorStatusCode, errorSubStatusCode
{ FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, false, 500, 0 },
{ FaultInjectionServerErrorType.RETRY_WITH, false, false, 449, 0 },
{ FaultInjectionServerErrorType.RETRY_WITH, true, false, 449, 0 },
{ FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, false, 429, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, false, 404, 1002 },
{ FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, true, true, 503, 21008 }
Expand Down Expand Up @@ -592,9 +593,34 @@ private void validateFaultInjectionRuleApplied(
assertThat(gatewayStatistics.get("statusCode").asInt()).isEqualTo(statusCode);
assertThat(gatewayStatistics.get("subStatusCode").asInt()).isEqualTo(subStatusCode);
assertThat(gatewayStatistics.get("faultInjectionRuleId").asText()).isEqualTo(ruleId);

if (canRetryOnFaultInjectedError && statusCode == HttpConstants.StatusCodes.RETRY_WITH) {
this.validateRetryWithGatewayStatistics(gatewayStatisticsList);
}
}
}

private void validateRetryWithGatewayStatistics(JsonNode gatewayStatisticsList) {
JsonNode retryGatewayStatistics = gatewayStatisticsList.get(1);
assertThat(retryGatewayStatistics).isNotNull();
assertThat(retryGatewayStatistics.get("statusCode").asInt())
.isNotEqualTo(HttpConstants.StatusCodes.RETRY_WITH);

JsonNode retryResponseTimeout = retryGatewayStatistics.get("httpNetworkResponseTimeout");
assertThat(retryResponseTimeout).isNotNull();

Duration retryTimeout = Duration.parse(retryResponseTimeout.asText());
assertThat(retryTimeout).isGreaterThan(Duration.ofSeconds(10));
assertThat(retryTimeout).isLessThanOrEqualTo(this.getMaxRetryWithTimeout());
}

private Duration getMaxRetryWithTimeout() {
ConsistencyLevel consistencyLevel = BridgeInternal.getContextClient(this.client)
.getDefaultConsistencyLevelOfAccount();

return consistencyLevel == ConsistencyLevel.STRONG ? Duration.ofSeconds(60) : Duration.ofSeconds(30);
}

private void validateNoFaultInjectionApplied(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.faultinjection;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
Expand Down Expand Up @@ -116,7 +117,7 @@ public static Object[][] faultInjectionServerErrorResponseProvider() {
return new Object[][]{
// faultInjectionServerError, will SDK retry, check for multi-region setup, errorStatusCode, errorSubStatusCode
{ FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, false, 500, 0 },
{ FaultInjectionServerErrorType.RETRY_WITH, false, false, 449, 0 },
{ FaultInjectionServerErrorType.RETRY_WITH, true, false, 449, 0 },
{ FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, false, 429, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, false, 404, 1002 },
{ FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, true, true, 503, 21008 }
Expand Down Expand Up @@ -748,9 +749,34 @@ private void validateFaultInjectionRuleApplied(
AssertionsForClassTypes.assertThat(gatewayStatistics.get("statusCode").asInt()).isEqualTo(statusCode);
AssertionsForClassTypes.assertThat(gatewayStatistics.get("subStatusCode").asInt()).isEqualTo(subStatusCode);
AssertionsForClassTypes.assertThat(gatewayStatistics.get("faultInjectionRuleId").asText()).isEqualTo(ruleId);

if (canRetryOnFaultInjectedError && statusCode == HttpConstants.StatusCodes.RETRY_WITH) {
this.validateRetryWithGatewayStatistics(gatewayStatisticsList);
}
}
}

private void validateRetryWithGatewayStatistics(JsonNode gatewayStatisticsList) {
JsonNode retryGatewayStatistics = gatewayStatisticsList.get(1);
AssertionsForClassTypes.assertThat(retryGatewayStatistics).isNotNull();
AssertionsForClassTypes.assertThat(retryGatewayStatistics.get("statusCode").asInt())
.isNotEqualTo(HttpConstants.StatusCodes.RETRY_WITH);

JsonNode retryResponseTimeout = retryGatewayStatistics.get("httpNetworkResponseTimeout");
AssertionsForClassTypes.assertThat(retryResponseTimeout).isNotNull();

Duration retryTimeout = Duration.parse(retryResponseTimeout.asText());
AssertionsForClassTypes.assertThat(retryTimeout).isGreaterThan(Duration.ofSeconds(10));
AssertionsForClassTypes.assertThat(retryTimeout).isLessThanOrEqualTo(this.getMaxRetryWithTimeout());
}

private Duration getMaxRetryWithTimeout() {
ConsistencyLevel consistencyLevel = BridgeInternal.getContextClient(this.client)
.getDefaultConsistencyLevelOfAccount();

return consistencyLevel == ConsistencyLevel.STRONG ? Duration.ofSeconds(60) : Duration.ofSeconds(30);
}

private static AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) {
Iterator<DatabaseAccountLocation> locationIterator =
writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.channel.ConnectTimeoutException;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import java.net.URI;

import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;

public class GatewayRetryWithRetryPolicyTest {
private static final int TIMEOUT = 60000;

@Test(groups = { "unit" }, timeOut = TIMEOUT)
public void shouldRetryGenericRetryWithException() throws Exception {
GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy(
createRequest(),
Mockito.mock(GlobalEndpointManager.class),
30);

CosmosException retryWithException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.RETRY_WITH);

ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(retryWithException).block();
assertThat(shouldRetryResult.shouldRetry).isTrue();
assertThat(shouldRetryResult.nonRelatedException).isFalse();
assertThat(shouldRetryResult.policyArg.getValue0()).isFalse();
assertThat(shouldRetryResult.policyArg.getValue1()).isTrue();
assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1);
validateRetryWithTimeRange(10, shouldRetryResult, 5);

shouldRetryResult = retryPolicy.shouldRetry(retryWithException).block();
assertThat(shouldRetryResult.shouldRetry).isTrue();
assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(2);
validateRetryWithTimeRange(20, shouldRetryResult, 5);
}

@Test(groups = { "unit" }, timeOut = TIMEOUT)
public void shouldNotRetryGoneException() throws Exception {
GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy(
createRequest(),
Mockito.mock(GlobalEndpointManager.class),
30);

CosmosException goneException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.GONE);

ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(goneException).block();
assertThat(shouldRetryResult.shouldRetry).isFalse();
assertThat(shouldRetryResult.nonRelatedException).isTrue();
assertThat(shouldRetryResult.exception).isSameAs(goneException);
}

@Test(groups = { "unit" }, timeOut = TIMEOUT)
public void shouldNotHandleThrottlingException() throws Exception {
GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy(
createRequest(),
Mockito.mock(GlobalEndpointManager.class),
30);

CosmosException throttlingException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.TOO_MANY_REQUESTS);

ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(throttlingException).block();
assertThat(shouldRetryResult.shouldRetry).isFalse();
assertThat(shouldRetryResult.nonRelatedException).isTrue();
assertThat(shouldRetryResult.exception).isSameAs(throttlingException);
}

@Test(groups = { "unit" }, timeOut = TIMEOUT)
public void shouldDelegateRetryableNetworkExceptionToMetadataPolicy() throws Exception {
GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy(
createRequest(),
Mockito.mock(GlobalEndpointManager.class),
30);

ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(new ConnectTimeoutException()).block();
assertThat(shouldRetryResult.shouldRetry).isTrue();
assertThat(shouldRetryResult.nonRelatedException).isFalse();
assertThat(shouldRetryResult.exception).isNull();
assertThat(shouldRetryResult.backOffTime).isNotNull();
}

@Test(groups = { "unit" }, timeOut = TIMEOUT)
public void shouldStopRetryingRetryWithAfterTimeout() throws Exception {
GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy(
createRequest(),
Mockito.mock(GlobalEndpointManager.class),
0);

CosmosException retryWithException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.RETRY_WITH);

ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(retryWithException).block();
assertThat(shouldRetryResult.shouldRetry).isFalse();
assertThat(shouldRetryResult.exception).isSameAs(retryWithException);
assertThat(shouldRetryResult.nonRelatedException).isFalse();
}

private static RxDocumentServiceRequest createRequest() throws Exception {
DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
clientContext,
OperationType.Read,
ResourceType.Document);

request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics();
request.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(new URI("https://localhost"));
return request;
}

private static void validateRetryWithTimeRange(
int expectedDelayInMs,
ShouldRetryResult retryResult,
int saltValueInMs) {

assertThat(retryResult.backOffTime.toMillis()).isGreaterThan(expectedDelayInMs - saltValueInMs);
assertThat(retryResult.backOffTime.toMillis()).isLessThan(expectedDelayInMs + saltValueInMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.lang.reflect.Method;
import java.net.SocketException;
import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -596,6 +597,145 @@ public void nullAdditionalHeadersDoesNotAffectPerformRequest() throws Exception
assertThat(headers.toMap().get(HttpConstants.HttpHeaders.WORKLOAD_ID)).isNull();
}

@Test(groups = "unit")
public void gatewayAddsNoRetry449Header() throws Exception {
DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class);

Mockito.doReturn(new RegionalRoutingContext(new URI("https://localhost")))
.when(globalEndpointManager).resolveServiceEndpoint(any());

HttpClient httpClient = Mockito.mock(HttpClient.class);
ArgumentCaptor<HttpRequest> httpClientRequestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException()));

RxGatewayStoreModel storeModel = new RxGatewayStoreModel(
clientContext,
sessionContainer,
ConsistencyLevel.SESSION,
QueryCompatibilityMode.Default,
new UserAgentContainer(),
globalEndpointManager,
httpClient,
null,
null);

RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
clientContext,
OperationType.Read,
"/dbs/db/colls/col/docs/doc1",
ResourceType.Document);
request.requestContext = new DocumentServiceRequestContext();
request.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(new URI("https://localhost"));

try {
storeModel.performRequest(request).block();
fail("Request should fail");
} catch (Exception expectedException) {
// expected
}

Mockito.verify(httpClient).send(httpClientRequestCaptor.capture(), any());
HttpHeaders headers = ReflectionUtils.getHttpHeaders(httpClientRequestCaptor.getValue());
assertThat(headers.toMap().get(HttpConstants.HttpHeaders.NO_RETRY_449)).isEqualTo("true");
}

@Test(groups = "unit")
public void gatewayOverridesNoRetry449Header() throws Exception {
DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class);

Mockito.doReturn(new RegionalRoutingContext(new URI("https://localhost")))
.when(globalEndpointManager).resolveServiceEndpoint(any());

HttpClient httpClient = Mockito.mock(HttpClient.class);
ArgumentCaptor<HttpRequest> httpClientRequestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException()));

Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(HttpConstants.HttpHeaders.NO_RETRY_449, "false");
RxGatewayStoreModel storeModel = new RxGatewayStoreModel(
clientContext,
sessionContainer,
ConsistencyLevel.SESSION,
QueryCompatibilityMode.Default,
new UserAgentContainer(),
globalEndpointManager,
httpClient,
null,
additionalHeaders);

RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
clientContext,
OperationType.Read,
"/dbs/db/colls/col/docs/doc1",
ResourceType.Document);
request.requestContext = new DocumentServiceRequestContext();
request.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(new URI("https://localhost"));
request.getHeaders().put(HttpConstants.HttpHeaders.NO_RETRY_449, "false");

try {
storeModel.performRequest(request).block();
fail("Request should fail");
} catch (Exception expectedException) {
// expected
}

Mockito.verify(httpClient).send(httpClientRequestCaptor.capture(), any());
HttpHeaders headers = ReflectionUtils.getHttpHeaders(httpClientRequestCaptor.getValue());
assertThat(headers.toMap().get(HttpConstants.HttpHeaders.NO_RETRY_449)).isEqualTo("true");
}

@Test(groups = "unit")
public void gatewayRetryWithTimeoutUsesStrongConsistencyFromGatewayServiceConfigurationReader() throws Exception {
DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
GatewayServiceConfigurationReader gatewayServiceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class);
Mockito.doReturn(ConsistencyLevel.STRONG)
.when(gatewayServiceConfigurationReader).getDefaultConsistencyLevel();

RxGatewayStoreModel storeModel = new RxGatewayStoreModel(
clientContext,
Mockito.mock(ISessionContainer.class),
ConsistencyLevel.SESSION,
QueryCompatibilityMode.Default,
new UserAgentContainer(),
Mockito.mock(GlobalEndpointManager.class),
Mockito.mock(HttpClient.class),
null,
null);
storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader);

assertThat(getGatewayRetryWithTimeoutInSeconds(storeModel)).isEqualTo(60);
}

@Test(groups = "unit")
public void gatewayRetryWithTimeoutFallsBackToDefaultConsistencyWhenGatewayServiceConfigurationReaderIsNull()
throws Exception {

RxGatewayStoreModel storeModel = new RxGatewayStoreModel(
mockDiagnosticsClientContext(),
Mockito.mock(ISessionContainer.class),
ConsistencyLevel.STRONG,
QueryCompatibilityMode.Default,
new UserAgentContainer(),
Mockito.mock(GlobalEndpointManager.class),
Mockito.mock(HttpClient.class),
null,
null);

assertThat(getGatewayRetryWithTimeoutInSeconds(storeModel)).isEqualTo(60);
}

private static int getGatewayRetryWithTimeoutInSeconds(RxGatewayStoreModel storeModel) throws Exception {
Method getGatewayRetryWithTimeoutInSeconds = RxGatewayStoreModel.class
.getDeclaredMethod("getGatewayRetryWithTimeoutInSeconds");
getGatewayRetryWithTimeoutInSeconds.setAccessible(true);

return (int) getGatewayRetryWithTimeoutInSeconds.invoke(storeModel);
}

enum SessionTokenType {
NONE, // no session token applied
USER, // userControlled session token
Expand Down
Loading
Loading