diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java index f2636f313f83..eca994eaf13e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java @@ -4,6 +4,7 @@ package com.azure.cosmos.faultinjection; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; @@ -123,7 +124,7 @@ public static Object[][] faultInjectionServerErrorResponseProvider() { return new Object[][]{ // faultInjectionServerError, will SDK retry, check for multi-region setup, errorStatusCode, errorSubStatusCode { FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, false, 500, 0 }, - { FaultInjectionServerErrorType.RETRY_WITH, false, false, 449, 0 }, + { FaultInjectionServerErrorType.RETRY_WITH, true, false, 449, 0 }, { FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, false, 429, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE }, { FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, false, 404, 1002 }, { FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, true, true, 503, 21008 } @@ -592,9 +593,34 @@ private void validateFaultInjectionRuleApplied( assertThat(gatewayStatistics.get("statusCode").asInt()).isEqualTo(statusCode); assertThat(gatewayStatistics.get("subStatusCode").asInt()).isEqualTo(subStatusCode); assertThat(gatewayStatistics.get("faultInjectionRuleId").asText()).isEqualTo(ruleId); + + if (canRetryOnFaultInjectedError && statusCode == HttpConstants.StatusCodes.RETRY_WITH) { + this.validateRetryWithGatewayStatistics(gatewayStatisticsList); + } } } + private void validateRetryWithGatewayStatistics(JsonNode gatewayStatisticsList) { + JsonNode retryGatewayStatistics = gatewayStatisticsList.get(1); + assertThat(retryGatewayStatistics).isNotNull(); + assertThat(retryGatewayStatistics.get("statusCode").asInt()) + .isNotEqualTo(HttpConstants.StatusCodes.RETRY_WITH); + + JsonNode retryResponseTimeout = retryGatewayStatistics.get("httpNetworkResponseTimeout"); + assertThat(retryResponseTimeout).isNotNull(); + + Duration retryTimeout = Duration.parse(retryResponseTimeout.asText()); + assertThat(retryTimeout).isGreaterThan(Duration.ofSeconds(10)); + assertThat(retryTimeout).isLessThanOrEqualTo(this.getMaxRetryWithTimeout()); + } + + private Duration getMaxRetryWithTimeout() { + ConsistencyLevel consistencyLevel = BridgeInternal.getContextClient(this.client) + .getDefaultConsistencyLevelOfAccount(); + + return consistencyLevel == ConsistencyLevel.STRONG ? Duration.ofSeconds(60) : Duration.ofSeconds(30); + } + private void validateNoFaultInjectionApplied( CosmosDiagnostics cosmosDiagnostics, OperationType operationType, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java index b018161dcd0f..4dc7fd769395 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java @@ -4,6 +4,7 @@ package com.azure.cosmos.faultinjection; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; @@ -116,7 +117,7 @@ public static Object[][] faultInjectionServerErrorResponseProvider() { return new Object[][]{ // faultInjectionServerError, will SDK retry, check for multi-region setup, errorStatusCode, errorSubStatusCode { FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, false, 500, 0 }, - { FaultInjectionServerErrorType.RETRY_WITH, false, false, 449, 0 }, + { FaultInjectionServerErrorType.RETRY_WITH, true, false, 449, 0 }, { FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, false, 429, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE }, { FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, false, 404, 1002 }, { FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, true, true, 503, 21008 } @@ -748,9 +749,34 @@ private void validateFaultInjectionRuleApplied( AssertionsForClassTypes.assertThat(gatewayStatistics.get("statusCode").asInt()).isEqualTo(statusCode); AssertionsForClassTypes.assertThat(gatewayStatistics.get("subStatusCode").asInt()).isEqualTo(subStatusCode); AssertionsForClassTypes.assertThat(gatewayStatistics.get("faultInjectionRuleId").asText()).isEqualTo(ruleId); + + if (canRetryOnFaultInjectedError && statusCode == HttpConstants.StatusCodes.RETRY_WITH) { + this.validateRetryWithGatewayStatistics(gatewayStatisticsList); + } } } + private void validateRetryWithGatewayStatistics(JsonNode gatewayStatisticsList) { + JsonNode retryGatewayStatistics = gatewayStatisticsList.get(1); + AssertionsForClassTypes.assertThat(retryGatewayStatistics).isNotNull(); + AssertionsForClassTypes.assertThat(retryGatewayStatistics.get("statusCode").asInt()) + .isNotEqualTo(HttpConstants.StatusCodes.RETRY_WITH); + + JsonNode retryResponseTimeout = retryGatewayStatistics.get("httpNetworkResponseTimeout"); + AssertionsForClassTypes.assertThat(retryResponseTimeout).isNotNull(); + + Duration retryTimeout = Duration.parse(retryResponseTimeout.asText()); + AssertionsForClassTypes.assertThat(retryTimeout).isGreaterThan(Duration.ofSeconds(10)); + AssertionsForClassTypes.assertThat(retryTimeout).isLessThanOrEqualTo(this.getMaxRetryWithTimeout()); + } + + private Duration getMaxRetryWithTimeout() { + ConsistencyLevel consistencyLevel = BridgeInternal.getContextClient(this.client) + .getDefaultConsistencyLevelOfAccount(); + + return consistencyLevel == ConsistencyLevel.STRONG ? Duration.ofSeconds(60) : Duration.ofSeconds(30); + } + private static AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) { Iterator locationIterator = writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/GatewayRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/GatewayRetryWithRetryPolicyTest.java new file mode 100644 index 000000000000..46e0ef24e9e5 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/GatewayRetryWithRetryPolicyTest.java @@ -0,0 +1,123 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.routing.RegionalRoutingContext; +import io.netty.channel.ConnectTimeoutException; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import java.net.URI; + +import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; +import static org.assertj.core.api.Assertions.assertThat; + +public class GatewayRetryWithRetryPolicyTest { + private static final int TIMEOUT = 60000; + + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldRetryGenericRetryWithException() throws Exception { + GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy( + createRequest(), + Mockito.mock(GlobalEndpointManager.class), + 30); + + CosmosException retryWithException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.RETRY_WITH); + + ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(retryWithException).block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.nonRelatedException).isFalse(); + assertThat(shouldRetryResult.policyArg.getValue0()).isFalse(); + assertThat(shouldRetryResult.policyArg.getValue1()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); + validateRetryWithTimeRange(10, shouldRetryResult, 5); + + shouldRetryResult = retryPolicy.shouldRetry(retryWithException).block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(2); + validateRetryWithTimeRange(20, shouldRetryResult, 5); + } + + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldNotRetryGoneException() throws Exception { + GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy( + createRequest(), + Mockito.mock(GlobalEndpointManager.class), + 30); + + CosmosException goneException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.GONE); + + ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(goneException).block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.nonRelatedException).isTrue(); + assertThat(shouldRetryResult.exception).isSameAs(goneException); + } + + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldNotHandleThrottlingException() throws Exception { + GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy( + createRequest(), + Mockito.mock(GlobalEndpointManager.class), + 30); + + CosmosException throttlingException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.TOO_MANY_REQUESTS); + + ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(throttlingException).block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.nonRelatedException).isTrue(); + assertThat(shouldRetryResult.exception).isSameAs(throttlingException); + } + + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldDelegateRetryableNetworkExceptionToMetadataPolicy() throws Exception { + GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy( + createRequest(), + Mockito.mock(GlobalEndpointManager.class), + 30); + + ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(new ConnectTimeoutException()).block(); + assertThat(shouldRetryResult.shouldRetry).isTrue(); + assertThat(shouldRetryResult.nonRelatedException).isFalse(); + assertThat(shouldRetryResult.exception).isNull(); + assertThat(shouldRetryResult.backOffTime).isNotNull(); + } + + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldStopRetryingRetryWithAfterTimeout() throws Exception { + GatewayRetryWithRetryPolicy retryPolicy = new GatewayRetryWithRetryPolicy( + createRequest(), + Mockito.mock(GlobalEndpointManager.class), + 0); + + CosmosException retryWithException = BridgeInternal.createCosmosException(HttpConstants.StatusCodes.RETRY_WITH); + + ShouldRetryResult shouldRetryResult = retryPolicy.shouldRetry(retryWithException).block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.exception).isSameAs(retryWithException); + assertThat(shouldRetryResult.nonRelatedException).isFalse(); + } + + private static RxDocumentServiceRequest createRequest() throws Exception { + DiagnosticsClientContext clientContext = mockDiagnosticsClientContext(); + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + clientContext, + OperationType.Read, + ResourceType.Document); + + request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics(); + request.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(new URI("https://localhost")); + return request; + } + + private static void validateRetryWithTimeRange( + int expectedDelayInMs, + ShouldRetryResult retryResult, + int saltValueInMs) { + + assertThat(retryResult.backOffTime.toMillis()).isGreaterThan(expectedDelayInMs - saltValueInMs); + assertThat(retryResult.backOffTime.toMillis()).isLessThan(expectedDelayInMs + saltValueInMs); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java index edf16806489c..9d67473cced5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java @@ -24,6 +24,7 @@ import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; +import java.lang.reflect.Method; import java.net.SocketException; import java.net.URI; import java.time.Duration; @@ -596,6 +597,145 @@ public void nullAdditionalHeadersDoesNotAffectPerformRequest() throws Exception assertThat(headers.toMap().get(HttpConstants.HttpHeaders.WORKLOAD_ID)).isNull(); } + @Test(groups = "unit") + public void gatewayAddsNoRetry449Header() throws Exception { + DiagnosticsClientContext clientContext = mockDiagnosticsClientContext(); + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); + + Mockito.doReturn(new RegionalRoutingContext(new URI("https://localhost"))) + .when(globalEndpointManager).resolveServiceEndpoint(any()); + + HttpClient httpClient = Mockito.mock(HttpClient.class); + ArgumentCaptor httpClientRequestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException())); + + RxGatewayStoreModel storeModel = new RxGatewayStoreModel( + clientContext, + sessionContainer, + ConsistencyLevel.SESSION, + QueryCompatibilityMode.Default, + new UserAgentContainer(), + globalEndpointManager, + httpClient, + null, + null); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Read, + "/dbs/db/colls/col/docs/doc1", + ResourceType.Document); + request.requestContext = new DocumentServiceRequestContext(); + request.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(new URI("https://localhost")); + + try { + storeModel.performRequest(request).block(); + fail("Request should fail"); + } catch (Exception expectedException) { + // expected + } + + Mockito.verify(httpClient).send(httpClientRequestCaptor.capture(), any()); + HttpHeaders headers = ReflectionUtils.getHttpHeaders(httpClientRequestCaptor.getValue()); + assertThat(headers.toMap().get(HttpConstants.HttpHeaders.NO_RETRY_449)).isEqualTo("true"); + } + + @Test(groups = "unit") + public void gatewayOverridesNoRetry449Header() throws Exception { + DiagnosticsClientContext clientContext = mockDiagnosticsClientContext(); + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); + + Mockito.doReturn(new RegionalRoutingContext(new URI("https://localhost"))) + .when(globalEndpointManager).resolveServiceEndpoint(any()); + + HttpClient httpClient = Mockito.mock(HttpClient.class); + ArgumentCaptor httpClientRequestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException())); + + Map additionalHeaders = new HashMap<>(); + additionalHeaders.put(HttpConstants.HttpHeaders.NO_RETRY_449, "false"); + RxGatewayStoreModel storeModel = new RxGatewayStoreModel( + clientContext, + sessionContainer, + ConsistencyLevel.SESSION, + QueryCompatibilityMode.Default, + new UserAgentContainer(), + globalEndpointManager, + httpClient, + null, + additionalHeaders); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Read, + "/dbs/db/colls/col/docs/doc1", + ResourceType.Document); + request.requestContext = new DocumentServiceRequestContext(); + request.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(new URI("https://localhost")); + request.getHeaders().put(HttpConstants.HttpHeaders.NO_RETRY_449, "false"); + + try { + storeModel.performRequest(request).block(); + fail("Request should fail"); + } catch (Exception expectedException) { + // expected + } + + Mockito.verify(httpClient).send(httpClientRequestCaptor.capture(), any()); + HttpHeaders headers = ReflectionUtils.getHttpHeaders(httpClientRequestCaptor.getValue()); + assertThat(headers.toMap().get(HttpConstants.HttpHeaders.NO_RETRY_449)).isEqualTo("true"); + } + + @Test(groups = "unit") + public void gatewayRetryWithTimeoutUsesStrongConsistencyFromGatewayServiceConfigurationReader() throws Exception { + DiagnosticsClientContext clientContext = mockDiagnosticsClientContext(); + GatewayServiceConfigurationReader gatewayServiceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.doReturn(ConsistencyLevel.STRONG) + .when(gatewayServiceConfigurationReader).getDefaultConsistencyLevel(); + + RxGatewayStoreModel storeModel = new RxGatewayStoreModel( + clientContext, + Mockito.mock(ISessionContainer.class), + ConsistencyLevel.SESSION, + QueryCompatibilityMode.Default, + new UserAgentContainer(), + Mockito.mock(GlobalEndpointManager.class), + Mockito.mock(HttpClient.class), + null, + null); + storeModel.setGatewayServiceConfigurationReader(gatewayServiceConfigurationReader); + + assertThat(getGatewayRetryWithTimeoutInSeconds(storeModel)).isEqualTo(60); + } + + @Test(groups = "unit") + public void gatewayRetryWithTimeoutFallsBackToDefaultConsistencyWhenGatewayServiceConfigurationReaderIsNull() + throws Exception { + + RxGatewayStoreModel storeModel = new RxGatewayStoreModel( + mockDiagnosticsClientContext(), + Mockito.mock(ISessionContainer.class), + ConsistencyLevel.STRONG, + QueryCompatibilityMode.Default, + new UserAgentContainer(), + Mockito.mock(GlobalEndpointManager.class), + Mockito.mock(HttpClient.class), + null, + null); + + assertThat(getGatewayRetryWithTimeoutInSeconds(storeModel)).isEqualTo(60); + } + + private static int getGatewayRetryWithTimeoutInSeconds(RxGatewayStoreModel storeModel) throws Exception { + Method getGatewayRetryWithTimeoutInSeconds = RxGatewayStoreModel.class + .getDeclaredMethod("getGatewayRetryWithTimeoutInSeconds"); + getGatewayRetryWithTimeoutInSeconds.setAccessible(true); + + return (int) getGatewayRetryWithTimeoutInSeconds.invoke(storeModel); + } + enum SessionTokenType { NONE, // no session token applied USER, // userControlled session token diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java index ec6315f209b2..f657036d81f5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java @@ -1,7 +1,9 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; @@ -190,6 +192,68 @@ public void testNullAdditionalHeadersThinClientStoreModel() throws Exception { .isNull(); } + @Test(groups = "unit") + public void thinClientDoesNotAddNoRetry449Header() throws Exception { + DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); + Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig(); + Mockito + .doReturn(ImplementationBridgeHelpers + .CosmosDiagnosticsHelper + .getCosmosDiagnosticsAccessor() + .create(clientContext, 1d)) + .when(clientContext).createDiagnostics(); + + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + Mockito.doReturn("1#100#1=20#2=5#3=30").when(sessionContainer).resolveGlobalSessionToken(any()); + + RegionalRoutingContext regionalRoutingContext = new RegionalRoutingContext(new URI("https://localhost:8080")); + regionalRoutingContext.setThinclientRegionalEndpoint(new URI("https://localhost:8081")); + GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); + Mockito.doReturn(regionalRoutingContext).when(globalEndpointManager).resolveServiceEndpoint(any()); + DatabaseAccount databaseAccount = Mockito.mock(DatabaseAccount.class); + Mockito.doReturn("test-account").when(databaseAccount).getId(); + Mockito.doReturn(databaseAccount).when(globalEndpointManager).getLatestDatabaseAccount(); + + HttpClient httpClient = Mockito.mock(HttpClient.class); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + Mockito.when(httpClient.send(requestCaptor.capture(), any())) + .thenReturn(Mono.error(new ConnectTimeoutException())); + + ThinClientStoreModel storeModel = new ThinClientStoreModel( + clientContext, + sessionContainer, + ConsistencyLevel.SESSION, + new UserAgentContainer(), + globalEndpointManager, + httpClient, + null); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Read, + "/dbs/db/colls/col/docs/doc1", + ResourceType.Document); + PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition(); + partitionKeyDefinition.setPaths(Collections.singletonList("/partitionKey")); + request.setPartitionKeyDefinition(partitionKeyDefinition); + request.setPartitionKeyInternal(PartitionKeyInternal.fromObjectArray(Collections.singletonList("testPk"), true)); + + try { + storeModel.performRequest(request).block(); + } catch (Exception e) { + // Expected - mock HTTP client throws ConnectTimeoutException + } + + assertThat(request.getHeaders().get(HttpConstants.HttpHeaders.NO_RETRY_449)) + .as("ThinClient request headers should not include the Gateway V1 no-retry-449 header") + .isNull(); + + HttpHeaders httpHeaders = ReflectionUtils.getHttpHeaders(requestCaptor.getValue()); + assertThat(httpHeaders.toMap().get(HttpConstants.HttpHeaders.NO_RETRY_449)) + .as("ThinClient HTTP framing headers should not include the Gateway V1 no-retry-449 header") + .isNull(); + } + @Test(groups = "unit") public void cloneShouldPreservePartitionKeyDefinition() { DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java index 7c5c9be002bd..213a71ebff59 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java @@ -19,8 +19,6 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.ShouldRetryResult; import com.azure.cosmos.implementation.guava25.base.Supplier; -import com.azure.cosmos.implementation.RetryWithException; -import org.mockito.Mockito; import org.testng.annotations.Test; import reactor.core.publisher.Mono; @@ -356,7 +354,7 @@ public void retryWithDefaultTimeouts() { ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - RetryWithException retryWithException = Mockito.mock(RetryWithException.class); + RetryWithException retryWithException = new RetryWithException("Test", null, null); Mono singleShouldRetry = goneAndRetryWithRetryPolicy.shouldRetry(retryWithException); ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java index 5895088fd528..c632fd06e315 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java @@ -45,6 +45,7 @@ import static com.azure.cosmos.rx.TestSuiteBase.createDatabase; import static com.azure.cosmos.rx.TestSuiteBase.safeClose; import static com.azure.cosmos.rx.TestSuiteBase.safeDeleteDatabase; +import static com.azure.cosmos.rx.TestSuiteBase.waitForCollectionToBeAvailableToRead; import static org.assertj.core.api.Assertions.assertThat; import com.azure.cosmos.SuperFlakyTestRetryAnalyzer; @@ -101,6 +102,8 @@ public void before_NonStreamingOrderByQueryVectorSearchTest() { database.createContainer(containerProperties).block(); largeDataContainer = database.getContainer(largeDataContainerId); + waitForCollectionToBeAvailableToRead(); + for (Document doc : getVectorDocs()) { flatIndexContainer.createItem(doc).block(); quantizedIndexContainer.createItem(doc).block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 1b8c48f8d845..16722fd466bd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -688,6 +688,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { })) .block(); roundTripsContainer = createdDatabase.getContainer(containerName); + waitForCollectionToBeAvailableToRead(); setupRoundTripContainer(); List> keyValuePropsList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index 31e380625577..966890825de4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -126,6 +126,7 @@ public void orderByQueryForLargeCollection() { ).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerProperties.getId()); + waitForCollectionToBeAvailableToRead(); int partitionDocCount = 5; int pageSize = partitionDocCount + 1; @@ -381,6 +382,7 @@ public void splitQueryContinuationToken() throws Exception { CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk"); CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerId); + waitForCollectionToBeAvailableToRead(); AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(this.client); //Insert some documents @@ -492,6 +494,7 @@ public void orderbyContinuationOnUndefinedAndNull() throws Exception { createdDatabase.createContainer(containerProperties, new CosmosContainerRequestOptions()).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerProperties.getId()); + waitForCollectionToBeAvailableToRead(); CosmosContainerResponse containerResponse = container.read().block(); assert (containerResponse != null); CosmosContainerProperties properties = containerResponse.getProperties(); @@ -579,6 +582,7 @@ public void queryLargePartitionKeyOn100BPKCollection() throws Exception { CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/id"); CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerId); + waitForCollectionToBeAvailableToRead(); //id as partitionkey > 100bytes String itemID1 = "cosmosdb" + "-drWarm4Z60GkknMfHLo5BwuiH7w6AffzSb9jKbvwAQwaRZd10oxnLeCueuyZ5gbm9dwVVAqJLdzrB38Dk73Q6xMErv-0"; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java index ad3c980c6f93..e081fee801b4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java @@ -88,6 +88,7 @@ public CosmosAsyncContainer createCollections(CosmosAsyncDatabase database) { partitionKeyDef.setPaths(paths); CosmosContainerProperties containerProperties = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); database.createContainer(containerProperties, new CosmosContainerRequestOptions()).block(); + waitForCollectionToBeAvailableToRead(); return database.getContainer(containerProperties.getId()); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 2b213a5c5683..0e84dde4c2cf 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -566,16 +566,26 @@ public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database .getCosmosAsyncClientAccessor() .getPreferredRegions(client).size() > 1; if (throughput > 6000 || isMultiRegional) { - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + waitForCollectionToBeAvailableToRead(); } return database.getContainer(cosmosContainerProperties.getId()); } + protected static void waitForCollectionToBeAvailableToRead() { + // Creating a container is an async task - especially with multiple regions it can + // take some time until the container is available in the remote regions as well. + // When the container does not exist yet, metadata reads or item operations can + // fail with 404/1013 "Collection is not yet available for read". + // So, adding this delay after container creation to minimize risk of hitting these errors. + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, CosmosContainerRequestOptions options) { database.createContainer(cosmosContainerProperties, options) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 66f38301cb6c..3c1f0975a420 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -14,6 +14,7 @@ #### Other Changes * Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062) * Fixed a sporadic `NullPointerException` in `JsonSerializable.getWithMapping` triggered by concurrent first-time calls to `DatabaseAccount.getConsistencyPolicy()` and its sibling lazy getters (`getReplicationPolicy`, `getSystemReplicationPolicy`, `getQueryEngineConfiguration`). The fix makes `JsonSerializable.propertyBag` `final`, closing an unsafe-publication race in the lazy-initialisation pattern. - See [Issue 49256](https://github.com/Azure/azure-sdk-for-java/issues/49256) and [PR #49258](https://github.com/Azure/azure-sdk-for-java/pull/49258) +* Changed 449 (`Retry With`) retries in Gateway V1 and Gateway V2 to be consistently orchestrated client-side. - See [PR 49332](https://github.com/Azure/azure-sdk-for-java/pull/49332) ### 4.80.0 (2026-05-01) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java index 3bc37ff48d2c..b29c7651b631 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java @@ -94,6 +94,10 @@ public static boolean isCommonlyExpectedExceptionPossiblyCausingNoisyLogs(int st return true; } + if (statusCode == HttpConstants.StatusCodes.RETRY_WITH) { + return true; + } + return false; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GatewayRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GatewayRetryWithRetryPolicy.java new file mode 100644 index 000000000000..a0573b4ab2b9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GatewayRetryWithRetryPolicy.java @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation; + +import com.azure.cosmos.BridgeInternal; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.time.Instant; + +public class GatewayRetryWithRetryPolicy implements IRetryPolicy { + private final RetryWithRetryPolicy retryWithRetryPolicy; + private final MetadataRequestRetryPolicy metadataRequestRetryPolicy; + private final Instant start; + + public GatewayRetryWithRetryPolicy( + RxDocumentServiceRequest request, + GlobalEndpointManager globalEndpointManager, + Integer waitTimeInSeconds) { + + this.start = Instant.now(); + RetryContext retryContext = BridgeInternal.getRetryContext(request.requestContext.cosmosDiagnostics); + this.retryWithRetryPolicy = new RetryWithRetryPolicy( + waitTimeInSeconds, + retryContext, + () -> Duration.between(this.start, Instant.now()).toMillis(), + null); + this.metadataRequestRetryPolicy = new MetadataRequestRetryPolicy(globalEndpointManager); + this.metadataRequestRetryPolicy.onBeforeSendRequest(request); + } + + @Override + public Mono shouldRetry(Exception exception) { + return this.retryWithRetryPolicy.shouldRetry(exception).flatMap(retryWithResult -> { + if (!retryWithResult.nonRelatedException) { + return Mono.just(retryWithResult); + } + + return this.metadataRequestRetryPolicy.shouldRetry(exception).map(metadataRequestRetryResult -> { + if (metadataRequestRetryResult.shouldRetry || metadataRequestRetryResult.nonRelatedException) { + return metadataRequestRetryResult; + } + + if (metadataRequestRetryResult.exception != null) { + return ShouldRetryResult.errorOnNonRelatedException(metadataRequestRetryResult.exception); + } + + return ShouldRetryResult.noRetryOnNonRelatedException(); + }); + }); + } + + @Override + public RetryContext getRetryContext() { + return this.retryWithRetryPolicy.getRetryContext(); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 859acf9a748f..570bdb010f25 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -119,6 +119,7 @@ public static class HttpHeaders { public static final String REQUEST_VALIDATION_FAILURE = "x-ms-request-validation-failure"; public static final String WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH = "x-ms-write-request-trigger-refresh"; + public static final String NO_RETRY_449 = "x-ms-noretry-449"; // Quota Info public static final String MAX_RESOURCE_QUOTA = "x-ms-resource-quota"; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryWithRetryPolicy.java new file mode 100644 index 000000000000..079985b48cda --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryWithRetryPolicy.java @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation; + +import com.azure.cosmos.CosmosException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.LongSupplier; + +public class RetryWithRetryPolicy implements IRetryPolicy { + private final static Logger logger = LoggerFactory.getLogger(RetryWithRetryPolicy.class); + private final static int DEFAULT_WAIT_TIME_IN_SECONDS = 30; + private final static int MAXIMUM_BACKOFF_TIME_IN_MS = 1000; + private final static int INITIAL_BACKOFF_TIME_MS = 10; + private final static int BACK_OFF_MULTIPLIER = 2; + private final static int RANDOM_SALT_IN_MS = 5; + + private final AtomicInteger attemptCount = new AtomicInteger(1); + private final AtomicInteger currentBackoffMilliseconds = + new AtomicInteger(RetryWithRetryPolicy.INITIAL_BACKOFF_TIME_MS); + private final int waitTimeInSeconds; + private final RetryContext retryContext; + private final LongSupplier elapsedTimeInMillisSupplier; + private final Consumer retryWithExceptionConsumer; + + public RetryWithRetryPolicy( + Integer waitTimeInSeconds, + RetryContext retryContext, + LongSupplier elapsedTimeInMillisSupplier, + Consumer retryWithExceptionConsumer) { + + this.waitTimeInSeconds = waitTimeInSeconds != null ? waitTimeInSeconds : DEFAULT_WAIT_TIME_IN_SECONDS; + this.retryContext = retryContext; + this.elapsedTimeInMillisSupplier = elapsedTimeInMillisSupplier; + this.retryWithExceptionConsumer = retryWithExceptionConsumer; + } + + @Override + public Mono shouldRetry(Exception exception) { + CosmosException cosmosException = Utils.as(exception, CosmosException.class); + if (cosmosException == null || cosmosException.getStatusCode() != HttpConstants.StatusCodes.RETRY_WITH) { + logger.debug("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount.get(), + exception); + return Mono.just(ShouldRetryResult.noRetryOnNonRelatedException()); + } + + if (this.retryWithExceptionConsumer != null) { + this.retryWithExceptionConsumer.accept(cosmosException); + } + + long remainingMilliseconds = + (this.waitTimeInSeconds * 1_000L) - this.elapsedTimeInMillisSupplier.getAsLong(); + int currentRetryAttemptCount = this.attemptCount.getAndIncrement(); + + if (remainingMilliseconds <= 0) { + logger.warn("Received RetryWith response after backoff/retry. Will fail the request.", cosmosException); + return Mono.just(ShouldRetryResult.error(cosmosException)); + } + + Duration backoffTime = Duration.ofMillis( + Math.min( + Math.min( + this.currentBackoffMilliseconds.get() + + ThreadLocalRandom.current().nextInt(RetryWithRetryPolicy.RANDOM_SALT_IN_MS), + remainingMilliseconds), + RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS)); + + this.currentBackoffMilliseconds.set( + Math.max( + RetryWithRetryPolicy.INITIAL_BACKOFF_TIME_MS, + Math.min( + RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS, + this.currentBackoffMilliseconds.get() * RetryWithRetryPolicy.BACK_OFF_MULTIPLIER)) + ); + + logger.debug("BackoffTime: {} ms.", backoffTime.toMillis()); + + long timeoutInMillSec = remainingMilliseconds - backoffTime.toMillis(); + Duration timeout = timeoutInMillSec > 0 ? Duration.ofMillis(timeoutInMillSec) + : Duration.ofMillis(RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS); + + logger.debug("Received RetryWith response, will retry, ", exception); + + return Mono.just(ShouldRetryResult.retryAfter(backoffTime, + Quadruple.with(false, true, timeout, currentRetryAttemptCount))); + } + + @Override + public RetryContext getRetryContext() { + return this.retryContext; + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 4afa44571ba0..e2431570e0c6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -46,6 +46,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.HashMap; @@ -53,8 +54,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -73,6 +74,8 @@ private static ImplementationBridgeHelpers.CosmosExceptionHelper.CosmosException private static final boolean leakDetectionDebuggingEnabled = ResourceLeakDetector.getLevel().ordinal() >= ResourceLeakDetector.Level.ADVANCED.ordinal(); private static final boolean HTTP_CONNECTION_WITHOUT_TLS_ALLOWED = Configs.isHttpConnectionWithoutTLSAllowed(); + private static final int GATEWAY_RETRY_WITH_TIMEOUT_IN_SECONDS = 30; + private static final int STRONG_GATEWAY_RETRY_WITH_TIMEOUT_IN_SECONDS = 60; private static final List headersNeedToBeEscaped = Arrays.asList( HttpConstants.HttpHeaders.PARTITION_KEY, HttpConstants.HttpHeaders.POST_TRIGGER_EXCLUDE, @@ -298,6 +301,8 @@ public Mono performRequest(RxDocumentServiceRequest r } } + this.applyGatewayRetryWithHeaders(request); + URI uri = getUri(request); request.requestContext.resourcePhysicalAddress = uri.toString(); @@ -315,6 +320,10 @@ protected boolean partitionKeyRangeResolutionNeeded(RxDocumentServiceRequest req return false; } + protected void applyGatewayRetryWithHeaders(RxDocumentServiceRequest request) { + request.getHeaders().put(HttpConstants.HttpHeaders.NO_RETRY_449, "true"); + } + /** * Given the request it creates an flux which upon subscription issues HTTP call and emits one RxDocumentServiceResponse. * @@ -794,12 +803,63 @@ private Mono invokeAsyncInternal(RxDocumentServiceReq private Mono invokeAsync(RxDocumentServiceRequest request) { - Callable> funcDelegate = () -> invokeAsyncInternal(request).single(); + if (request.requestContext.cosmosDiagnostics == null) { + request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics(); + } + + Function, Mono> funcDelegate = + retryPolicyArg -> { + this.applyGatewayRetryPolicyArg(request, retryPolicyArg); + return invokeAsyncInternal(request).single(); + }; + + GatewayRetryWithRetryPolicy gatewayRetryWithRetryPolicy = new GatewayRetryWithRetryPolicy( + request, + this.globalEndpointManager, + this.getGatewayRetryWithTimeoutInSeconds()); + + return BackoffRetryUtility.executeAsync( + funcDelegate, + gatewayRetryWithRetryPolicy, + null, + Duration.ZERO, + request, + null); + } + + private void applyGatewayRetryPolicyArg( + RxDocumentServiceRequest request, + Quadruple retryPolicyArg) { + + if (retryPolicyArg == null || !Boolean.TRUE.equals(retryPolicyArg.getValue1())) { + return; + } + + Duration remainingTime = retryPolicyArg.getValue2(); + Integer retryAttemptCount = retryPolicyArg.getValue3(); + + if (remainingTime != null) { + request.setResponseTimeout(remainingTime); + request.getHeaders().put( + HttpConstants.HttpHeaders.REMAINING_TIME_IN_MS_ON_CLIENT_REQUEST, + Long.toString(remainingTime.toMillis())); + } + + if (retryAttemptCount != null) { + request.getHeaders().put( + HttpConstants.HttpHeaders.CLIENT_RETRY_ATTEMPT_COUNT, + retryAttemptCount.toString()); + } + } - MetadataRequestRetryPolicy metadataRequestRetryPolicy = new MetadataRequestRetryPolicy(this.globalEndpointManager); - metadataRequestRetryPolicy.onBeforeSendRequest(request); + private int getGatewayRetryWithTimeoutInSeconds() { + ConsistencyLevel effectiveConsistencyLevel = this.gatewayServiceConfigurationReader != null + ? this.gatewayServiceConfigurationReader.getDefaultConsistencyLevel() + : this.defaultConsistencyLevel; - return BackoffRetryUtility.executeRetry(funcDelegate, metadataRequestRetryPolicy); + return effectiveConsistencyLevel == ConsistencyLevel.STRONG + ? STRONG_GATEWAY_RETRY_WITH_TIMEOUT_IN_SECONDS + : GATEWAY_RETRY_WITH_TIMEOUT_IN_SECONDS; } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index c90b6940fef1..3a849b4c5baa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -78,6 +78,11 @@ public Mono processMessage(RxDocumentServiceRequest r return super.processMessage(request); } + @Override + protected void applyGatewayRetryWithHeaders(RxDocumentServiceRequest request) { + // ThinClient does not use the Gateway V1 server-side 449 retry loop. + } + @Override protected Map getDefaultHeaders( ApiType apiType, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 5bfe3d8078dd..1822baf02b23 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -16,6 +16,7 @@ import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; import com.azure.cosmos.implementation.Quadruple; import com.azure.cosmos.implementation.RetryContext; +import com.azure.cosmos.implementation.RetryWithRetryPolicy; import com.azure.cosmos.implementation.RetryWithException; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.ShouldRetryResult; @@ -27,7 +28,6 @@ import java.time.Duration; import java.time.Instant; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -46,7 +46,6 @@ private static ImplementationBridgeHelpers.CosmosExceptionHelper.CosmosException private volatile RetryWithException lastRetryWithException; private RetryContext retryContext; - private static final ThreadLocalRandom random = ThreadLocalRandom.current(); public GoneAndRetryWithRetryPolicy(RxDocumentServiceRequest request, Integer waitTimeInSeconds) { this.retryContext = BridgeInternal.getRetryContext(request.requestContext.cosmosDiagnostics); @@ -55,8 +54,16 @@ public GoneAndRetryWithRetryPolicy(RxDocumentServiceRequest request, Integer wai waitTimeInSeconds, this.retryContext ); - this.retryWithRetryPolicy = new RetryWithRetryPolicy(waitTimeInSeconds, this.retryContext); this.start = Instant.now(); + this.retryWithRetryPolicy = new RetryWithRetryPolicy( + waitTimeInSeconds, + this.retryContext, + () -> GoneAndRetryWithRetryPolicy.this.getElapsedTime().toMillis(), + cosmosException -> { + if (cosmosException instanceof RetryWithException) { + GoneAndRetryWithRetryPolicy.this.lastRetryWithException = (RetryWithException) cosmosException; + } + }); } @Override @@ -311,81 +318,4 @@ private Pair, Boolean> handlePartitionKeyIsSplittingExce } } - class RetryWithRetryPolicy implements IRetryPolicy { - private final static int DEFAULT_WAIT_TIME_IN_SECONDS = 30; - private final static int MAXIMUM_BACKOFF_TIME_IN_MS = 1000; - private final static int INITIAL_BACKOFF_TIME_MS = 10; - private final static int BACK_OFF_MULTIPLIER = 2; - private final static int RANDOM_SALT_IN_MS = 5; - - private final AtomicInteger attemptCount = new AtomicInteger(1); - private final AtomicInteger currentBackoffMilliseconds = new AtomicInteger(RetryWithRetryPolicy.INITIAL_BACKOFF_TIME_MS); - - private final int waitTimeInSeconds; - private final RetryContext retryContext; - - public RetryWithRetryPolicy(Integer waitTimeInSeconds, RetryContext retryContext) { - this.waitTimeInSeconds = waitTimeInSeconds != null ? waitTimeInSeconds : DEFAULT_WAIT_TIME_IN_SECONDS; - this.retryContext = retryContext; - } - - @Override - public Mono shouldRetry(Exception exception) { - Duration backoffTime; - Duration timeout; - - if (!(exception instanceof RetryWithException)) { - logger.debug("Operation will NOT be retried. Current attempt {}, Exception: ", this.attemptCount.get(), - exception); - return Mono.just(ShouldRetryResult.noRetryOnNonRelatedException()); - } - - RetryWithException lastRetryWithException = (RetryWithException)exception; - GoneAndRetryWithRetryPolicy.this.lastRetryWithException = lastRetryWithException; - - long remainingMilliseconds = - (this.waitTimeInSeconds * 1_000L) - - GoneAndRetryWithRetryPolicy.this.getElapsedTime().toMillis(); - int currentRetryAttemptCount = this.attemptCount.getAndIncrement(); - - if (remainingMilliseconds <= 0) { - logger.warn("Received RetryWithException after backoff/retry. Will fail the request.", - lastRetryWithException); - return Mono.just(ShouldRetryResult.error(lastRetryWithException)); - } - - backoffTime = Duration.ofMillis( - Math.min( - Math.min(this.currentBackoffMilliseconds.get() + random.nextInt(RANDOM_SALT_IN_MS), remainingMilliseconds), - RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS)); - - this.currentBackoffMilliseconds.set( - Math.max( - RetryWithRetryPolicy.INITIAL_BACKOFF_TIME_MS, - Math.min( - RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS, - this.currentBackoffMilliseconds.get() * RetryWithRetryPolicy.BACK_OFF_MULTIPLIER)) - ); - - logger.debug("BackoffTime: {} ms.", backoffTime.toMillis()); - - // Calculate the remaining time based after accounting for the backoff that we - // will perform - long timeoutInMillSec = remainingMilliseconds - backoffTime.toMillis(); - timeout = timeoutInMillSec > 0 ? Duration.ofMillis(timeoutInMillSec) - : Duration.ofMillis(RetryWithRetryPolicy.MAXIMUM_BACKOFF_TIME_IN_MS); - - logger.debug("Received RetryWithException, will retry, ", exception); - - // For RetryWithException, prevent the caller - // from refreshing any caches. - return Mono.just(ShouldRetryResult.retryAfter(backoffTime, - Quadruple.with(false, true, timeout, currentRetryAttemptCount))); - } - - @Override - public RetryContext getRetryContext() { - return this.retryContext; - } - } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/RecordingClientHttpRequestFactoryBuilderConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/RecordingClientHttpRequestFactoryBuilderConfiguration.java new file mode 100644 index 000000000000..87815254df89 --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/RecordingClientHttpRequestFactoryBuilderConfiguration.java @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.spring.cloud.autoconfigure.implementation.aad; + +import org.springframework.boot.http.client.ClientHttpRequestFactoryBuilder; +import org.springframework.boot.http.client.HttpClientSettings; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ClientHttpRequestFactory; + +@Configuration(proxyBeanMethods = false) +public class RecordingClientHttpRequestFactoryBuilderConfiguration { + + @Bean + public RecordingClientHttpRequestFactoryBuilder recordingClientHttpRequestFactoryBuilder() { + return new RecordingClientHttpRequestFactoryBuilder(ClientHttpRequestFactoryBuilder.detect()); + } + + public static final class RecordingClientHttpRequestFactoryBuilder + implements ClientHttpRequestFactoryBuilder { + + private final ClientHttpRequestFactoryBuilder delegate; + private HttpClientSettings clientSettings; + + RecordingClientHttpRequestFactoryBuilder( + ClientHttpRequestFactoryBuilder delegate) { + this.delegate = delegate; + } + + @Override + public ClientHttpRequestFactory build(HttpClientSettings settings) { + this.clientSettings = settings; + return this.delegate.build(settings); + } + + public HttpClientSettings getClientSettings() { + return this.clientSettings; + } + } +} diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/configuration/AadResourceServerConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/configuration/AadResourceServerConfigurationTests.java index bb8cc538e012..158a8602ca66 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/configuration/AadResourceServerConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aad/configuration/AadResourceServerConfigurationTests.java @@ -3,20 +3,24 @@ package com.azure.spring.cloud.autoconfigure.implementation.aad.configuration; import com.azure.identity.extensions.implementation.template.AzureAuthenticationTemplate; +import com.azure.spring.cloud.autoconfigure.implementation.aad.RecordingClientHttpRequestFactoryBuilderConfiguration; +import com.azure.spring.cloud.autoconfigure.implementation.aad.RecordingClientHttpRequestFactoryBuilderConfiguration.RecordingClientHttpRequestFactoryBuilder; import com.azure.spring.cloud.autoconfigure.implementation.aad.configuration.properties.AadAuthenticationProperties; -import com.azure.spring.cloud.autoconfigure.implementation.aad.security.jwt.AadJwtIssuerValidator; import com.azure.spring.cloud.autoconfigure.implementation.aad.security.AadResourceServerHttpSecurityConfigurer; +import com.azure.spring.cloud.autoconfigure.implementation.aad.security.jwt.AadJwtIssuerValidator; import com.azure.spring.cloud.autoconfigure.implementation.context.AzureGlobalPropertiesAutoConfiguration; import com.nimbusds.jose.jwk.source.JWKSourceBuilder; import com.nimbusds.jwt.proc.JWTClaimsSetAwareJWSKeySelector; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.http.client.HttpClientSettings; import org.springframework.boot.http.converter.autoconfigure.HttpMessageConvertersAutoConfiguration; import org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLoggingListener; import org.springframework.boot.restclient.autoconfigure.RestTemplateAutoConfiguration; import org.springframework.boot.logging.LogLevel; import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.runner.WebApplicationContextRunner; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.core.convert.converter.Converter; import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity; @@ -72,6 +76,7 @@ void testCreateJwtDecoderByJwkKeySetUri() { @Test void testJwtDecoderTimeoutDefaultValues() { resourceServerContextRunner() + .withUserConfiguration(RecordingClientHttpRequestFactoryBuilderConfiguration.class) .withPropertyValues("spring.cloud.azure.active-directory.enabled=true") .run(context -> { AadAuthenticationProperties properties = context.getBean(AadAuthenticationProperties.class); @@ -81,7 +86,7 @@ void testJwtDecoderTimeoutDefaultValues() { .isEqualTo(Duration.ofMillis(JWKSourceBuilder.DEFAULT_HTTP_READ_TIMEOUT)); // Verify the default timeouts are applied to the RestTemplate used by the JwtDecoder final JwtDecoder jwtDecoder = context.getBean(JwtDecoder.class); - verifyJwtDecoderRestTemplateTimeouts(jwtDecoder, + verifyJwtDecoderRestTemplateTimeouts(context, jwtDecoder, JWKSourceBuilder.DEFAULT_HTTP_CONNECT_TIMEOUT, JWKSourceBuilder.DEFAULT_HTTP_READ_TIMEOUT); }); @@ -90,6 +95,7 @@ void testJwtDecoderTimeoutDefaultValues() { @Test void testJwtDecoderTimeoutCustomValues() { resourceServerContextRunner() + .withUserConfiguration(RecordingClientHttpRequestFactoryBuilderConfiguration.class) .withPropertyValues( "spring.cloud.azure.active-directory.enabled=true", "spring.cloud.azure.active-directory.jwt-connect-timeout=2000", @@ -103,7 +109,7 @@ void testJwtDecoderTimeoutCustomValues() { assertThat(jwtDecoder).isNotNull(); assertThat(jwtDecoder).isExactlyInstanceOf(NimbusJwtDecoder.class); // Verify the configured timeouts are applied to the RestTemplate used by the JwtDecoder - verifyJwtDecoderRestTemplateTimeouts(jwtDecoder, 2000, 3000); + verifyJwtDecoderRestTemplateTimeouts(context, jwtDecoder, 2000, 3000); }); } @@ -410,7 +416,8 @@ public Collection convert(Jwt source) { * has the expected connect and read timeouts applied to its ClientHttpRequestFactory. */ @SuppressWarnings("unchecked") - private static void verifyJwtDecoderRestTemplateTimeouts(JwtDecoder jwtDecoder, + private static void verifyJwtDecoderRestTemplateTimeouts(ApplicationContext context, + JwtDecoder jwtDecoder, int expectedConnectTimeoutMs, int expectedReadTimeoutMs) { // NimbusJwtDecoder -> jwtProcessor (DefaultJWTProcessor) @@ -443,13 +450,18 @@ private static void verifyJwtDecoderRestTemplateTimeouts(JwtDecoder jwtDecoder, // RestTemplate -> ClientHttpRequestFactory org.springframework.http.client.ClientHttpRequestFactory requestFactory = ((org.springframework.web.client.RestTemplate) restOperations).getRequestFactory(); + assertThat(requestFactory).isNotNull(); + + assertRecordedHttpClientSettings(context, expectedConnectTimeoutMs, expectedReadTimeoutMs); + } - // Verify timeouts on the request factory (may be stored as Duration or int) - Object connectTimeoutValue = ReflectionTestUtils.getField(requestFactory, "connectTimeout"); - Object readTimeoutValue = ReflectionTestUtils.getField(requestFactory, "readTimeout"); - int connectTimeout = connectTimeoutValue instanceof java.time.Duration d ? (int) d.toMillis() : (int) connectTimeoutValue; - int readTimeout = readTimeoutValue instanceof java.time.Duration d ? (int) d.toMillis() : (int) readTimeoutValue; - assertThat(connectTimeout).isEqualTo(expectedConnectTimeoutMs); - assertThat(readTimeout).isEqualTo(expectedReadTimeoutMs); + private static void assertRecordedHttpClientSettings(ApplicationContext context, + int expectedConnectTimeoutMs, + int expectedReadTimeoutMs) { + HttpClientSettings clientSettings = context.getBean(RecordingClientHttpRequestFactoryBuilder.class) + .getClientSettings(); + assertThat(clientSettings).isNotNull(); + assertThat(clientSettings.connectTimeout()).isEqualTo(Duration.ofMillis(expectedConnectTimeoutMs)); + assertThat(clientSettings.readTimeout()).isEqualTo(Duration.ofMillis(expectedReadTimeoutMs)); } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aadb2c/configuration/AadB2cResourceServerAutoConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aadb2c/configuration/AadB2cResourceServerAutoConfigurationTests.java index 517abd3b0e0f..cd100b48b9d9 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aadb2c/configuration/AadB2cResourceServerAutoConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/aadb2c/configuration/AadB2cResourceServerAutoConfigurationTests.java @@ -3,6 +3,8 @@ package com.azure.spring.cloud.autoconfigure.implementation.aadb2c.configuration; import com.azure.spring.cloud.autoconfigure.implementation.aadb2c.AadB2cConstants; +import com.azure.spring.cloud.autoconfigure.implementation.aad.RecordingClientHttpRequestFactoryBuilderConfiguration; +import com.azure.spring.cloud.autoconfigure.implementation.aad.RecordingClientHttpRequestFactoryBuilderConfiguration.RecordingClientHttpRequestFactoryBuilder; import com.azure.spring.cloud.autoconfigure.implementation.aad.security.jwt.AadIssuerJwsKeySelector; import com.azure.spring.cloud.autoconfigure.implementation.aad.security.jwt.AadTrustedIssuerRepository; import com.azure.spring.cloud.autoconfigure.implementation.aadb2c.configuration.conditions.AadB2cConditions; @@ -20,6 +22,7 @@ import org.mockito.Mockito; import org.springframework.beans.BeanUtils; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.http.client.HttpClientSettings; import org.springframework.boot.http.converter.autoconfigure.HttpMessageConvertersAutoConfiguration; import org.springframework.boot.restclient.autoconfigure.RestTemplateAutoConfiguration; import org.springframework.boot.test.context.FilteredClassLoader; @@ -145,22 +148,25 @@ void testB2COnlyResourceServerBean() { @Test void testB2CTimeoutDefaultValues() { - getResourceServerContextRunner().run(context -> { - AadB2cProperties properties = context.getBean(AadB2cProperties.class); - assertThat(properties.getJwtConnectTimeout()) - .isEqualTo(Duration.ofMillis(JWKSourceBuilder.DEFAULT_HTTP_CONNECT_TIMEOUT)); - assertThat(properties.getJwtReadTimeout()) - .isEqualTo(Duration.ofMillis(JWKSourceBuilder.DEFAULT_HTTP_READ_TIMEOUT)); - // Verify the default timeouts are applied to the RestTemplate used by the ResourceRetriever - verifyResourceRetrieverRestTemplateTimeouts(context, - JWKSourceBuilder.DEFAULT_HTTP_CONNECT_TIMEOUT, - JWKSourceBuilder.DEFAULT_HTTP_READ_TIMEOUT); - }); + getResourceServerContextRunner() + .withUserConfiguration(RecordingClientHttpRequestFactoryBuilderConfiguration.class) + .run(context -> { + AadB2cProperties properties = context.getBean(AadB2cProperties.class); + assertThat(properties.getJwtConnectTimeout()) + .isEqualTo(Duration.ofMillis(JWKSourceBuilder.DEFAULT_HTTP_CONNECT_TIMEOUT)); + assertThat(properties.getJwtReadTimeout()) + .isEqualTo(Duration.ofMillis(JWKSourceBuilder.DEFAULT_HTTP_READ_TIMEOUT)); + // Verify the default timeouts are applied to the RestTemplate used by the ResourceRetriever + verifyResourceRetrieverRestTemplateTimeouts(context, + JWKSourceBuilder.DEFAULT_HTTP_CONNECT_TIMEOUT, + JWKSourceBuilder.DEFAULT_HTTP_READ_TIMEOUT); + }); } @Test void testB2CTimeoutCustomValues() { getResourceServerContextRunner() + .withUserConfiguration(RecordingClientHttpRequestFactoryBuilderConfiguration.class) .withPropertyValues( "spring.cloud.azure.active-directory.b2c.jwt-connect-timeout=2000", "spring.cloud.azure.active-directory.b2c.jwt-read-timeout=3000") @@ -353,15 +359,18 @@ private static void verifyResourceRetrieverRestTemplateTimeouts(ApplicationConte // RestTemplate -> ClientHttpRequestFactory org.springframework.http.client.ClientHttpRequestFactory requestFactory = ((org.springframework.web.client.RestTemplate) restOperations).getRequestFactory(); + assertThat(requestFactory).isNotNull(); + + assertRecordedHttpClientSettings(context, expectedConnectTimeoutMs, expectedReadTimeoutMs); + } - // Verify timeouts on the request factory (may be stored as Duration or int) - Object connectTimeoutValue = org.springframework.test.util.ReflectionTestUtils - .getField(requestFactory, "connectTimeout"); - Object readTimeoutValue = org.springframework.test.util.ReflectionTestUtils - .getField(requestFactory, "readTimeout"); - int connectTimeout = connectTimeoutValue instanceof java.time.Duration d ? (int) d.toMillis() : (int) connectTimeoutValue; - int readTimeout = readTimeoutValue instanceof java.time.Duration d ? (int) d.toMillis() : (int) readTimeoutValue; - assertThat(connectTimeout).isEqualTo(expectedConnectTimeoutMs); - assertThat(readTimeout).isEqualTo(expectedReadTimeoutMs); + private static void assertRecordedHttpClientSettings(ApplicationContext context, + int expectedConnectTimeoutMs, + int expectedReadTimeoutMs) { + HttpClientSettings clientSettings = context.getBean(RecordingClientHttpRequestFactoryBuilder.class) + .getClientSettings(); + assertThat(clientSettings).isNotNull(); + assertThat(clientSettings.connectTimeout()).isEqualTo(Duration.ofMillis(expectedConnectTimeoutMs)); + assertThat(clientSettings.readTimeout()).isEqualTo(Duration.ofMillis(expectedReadTimeoutMs)); } }