diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 65138e8dbd..ba61d0132e 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -1309,8 +1309,7 @@ else if (!HeaderMode.none.equals(headerMode)) { boolean shouldHaveRetried = withRetry && !useConfiguredRetryableException; assertThat(handler.getInvocationCount()) .isEqualTo( - shouldHaveRetried ? consumerProperties.getMaxAttempts() + 1 : 1); - + shouldHaveRetried ? consumerProperties.getMaxAttempts() : 1); assertThat(receivedMessage.getHeaders() .get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)) .isEqualTo(producerName.getBytes(StandardCharsets.UTF_8)); @@ -1422,7 +1421,7 @@ void defaultAutoCommitOnErrorWithDlq() throws Exception { new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8)) .isEqualTo(testMessagePayload); assertThat(handler.getInvocationCount()) - .isEqualTo(consumerProperties.getMaxAttempts() + 1); + .isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); dlqConsumerBinding.unbind(); consumerBinding.unbind(); @@ -1484,7 +1483,7 @@ void retriesWithoutDlq() throws Exception { // Since we don't have a DLQ, assert that we are invoking the handler exactly the same number of times // as set in consumerProperties.maxAttempt and not the default set by Spring Kafka (10 times). assertThat(handler.getInvocationCount()) - .isEqualTo(consumerProperties.getMaxAttempts() + 1); + .isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); consumerBinding.unbind(); producerBinding.unbind(); @@ -1609,7 +1608,7 @@ void autoCommitOnErrorWhenManualAcknowledgement() throws Exception { new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8)) .isEqualTo(testMessagePayload); assertThat(handler.getInvocationCount()) - .isEqualTo(consumerProperties.getMaxAttempts() + 1); + .isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); dlqConsumerBinding.unbind(); consumerBinding.unbind(); @@ -1689,7 +1688,7 @@ void configurableDlqName() throws Exception { new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8)) .isEqualTo(testMessagePayload); assertThat(handler.getInvocationCount()) - .isEqualTo(consumerProperties.getMaxAttempts() + 1); + .isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); dlqConsumerBinding.unbind(); consumerBinding.unbind(); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java index bb90b11b7a..4f4fb303e6 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java @@ -209,7 +209,7 @@ protected RetryTemplate buildRetryTemplate(ConsumerProperties properties) { } RetryPolicy retryPolicy = RetryPolicy.builder() - .maxRetries(properties.getMaxAttempts()) + .maxRetries(Math.max(0, properties.getMaxAttempts() - 1)) .delay(Duration.ofMillis(properties.getBackOffInitialInterval())) .multiplier(properties.getBackOffMultiplier()) .maxDelay(Duration.ofMillis(properties.getBackOffMaxInterval())) diff --git a/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MaxAttemptsRetryTests.java b/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MaxAttemptsRetryTests.java new file mode 100644 index 0000000000..afb72e3240 --- /dev/null +++ b/core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MaxAttemptsRetryTests.java @@ -0,0 +1,49 @@ +package org.springframework.cloud.stream.binder; + +import org.junit.jupiter.api.Test; +import org.springframework.core.retry.RetryTemplate; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +class MaxAttemptsRetryTests { + + @Test + void maxAttemptsShouldNotExceedConfiguredAttempts() { + + ConsumerProperties props = new ConsumerProperties(); + props.setMaxAttempts(2); + + TestBinder binder = new TestBinder(); + + RetryTemplate retryTemplate = binder.buildRetryTemplate(props); + + AtomicInteger attempts = new AtomicInteger(); + + try { + retryTemplate.execute(() -> { + attempts.incrementAndGet(); + throw new RuntimeException("fail"); + }); + } catch (Exception ignored) { + } + + assertThat(attempts.get()).isEqualTo(2); + } + + static class TestBinder extends AbstractBinder { + + @Override + protected Binding doBindConsumer(String name, String group, Object inboundBindTarget, + ConsumerProperties consumerProperties) { + return null; + } + + @Override + protected Binding doBindProducer(String name, Object outboundBindTarget, + ProducerProperties producerProperties) { + return null; + } + } +}