Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ import com.softwaremill.okapi.core.DeliveryResult
import com.softwaremill.okapi.core.OutboxEntry
import com.softwaremill.okapi.core.OutboxMessage
import io.kotest.core.spec.style.FunSpec
import io.kotest.datatest.WithDataTestName
import io.kotest.datatest.withTests
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
import java.time.Instant
import kotlin.reflect.KClass

private data class StatusCodeCase(
val statusCode: Int,
val expectedType: KClass<out DeliveryResult>,
) : WithDataTestName {
override fun dataTestName() = "$statusCode -> ${expectedType.simpleName}"
}

class HttpMessageDelivererTest : FunSpec({
val wiremock = WireMockServer(wireMockConfig().dynamicPort())
Expand All @@ -33,29 +43,17 @@ class HttpMessageDelivererTest : FunSpec({
return OutboxEntry.createPending(OutboxMessage("test", """{"k":"v"}"""), info, Instant.now())
}

test("200 → Success") {
wiremock.stubFor(post(urlEqualTo("/test")).willReturn(aResponse().withStatus(200)))
deliverer.deliver(entry()) shouldBe DeliveryResult.Success
}

test("500 → RetriableFailure") {
wiremock.stubFor(post(urlEqualTo("/test")).willReturn(aResponse().withStatus(500)))
deliverer.deliver(entry()).shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}

test("429 → RetriableFailure") {
wiremock.stubFor(post(urlEqualTo("/test")).willReturn(aResponse().withStatus(429)))
deliverer.deliver(entry()).shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}

test("408 → RetriableFailure") {
wiremock.stubFor(post(urlEqualTo("/test")).willReturn(aResponse().withStatus(408)))
deliverer.deliver(entry()).shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}

test("400 → PermanentFailure") {
wiremock.stubFor(post(urlEqualTo("/test")).willReturn(aResponse().withStatus(400)))
deliverer.deliver(entry()).shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
context("status code mapping") {
withTests(
StatusCodeCase(200, DeliveryResult.Success::class),
StatusCodeCase(500, DeliveryResult.RetriableFailure::class),
StatusCodeCase(429, DeliveryResult.RetriableFailure::class),
StatusCodeCase(408, DeliveryResult.RetriableFailure::class),
StatusCodeCase(400, DeliveryResult.PermanentFailure::class),
) { (statusCode, expectedType) ->
wiremock.stubFor(post(urlEqualTo("/test")).willReturn(aResponse().withStatus(statusCode)))
deliverer.deliver(entry())::class shouldBe expectedType
}
}

test("custom retriable codes") {
Expand Down Expand Up @@ -85,7 +83,7 @@ class HttpMessageDelivererTest : FunSpec({
wiremock.verify(postRequestedFor(urlEqualTo("/test")).withHeader("Content-Type", equalTo("text/plain")))
}

test("connection error RetriableFailure") {
test("connection error -> RetriableFailure") {
wiremock.stubFor(
post(urlEqualTo("/test"))
.willReturn(aResponse().withFault(com.github.tomakehurst.wiremock.http.Fault.CONNECTION_RESET_BY_PEER)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.softwaremill.okapi.core.OutboxStatus
import com.softwaremill.okapi.core.OutboxStore
import com.softwaremill.okapi.core.RetryPolicy
import com.softwaremill.okapi.test.support.RecordingMessageDeliverer
import io.kotest.assertions.withClue
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.maps.shouldContain
Expand Down Expand Up @@ -102,12 +103,16 @@ fun FunSpec.concurrentClaimTests(

// Assert disjoint
val intersection = idsA.toSet().intersect(idsB.toSet())
intersection shouldHaveSize 0
withClue("Sets should be disjoint (overlap: $intersection, A claimed ${idsA.size}, B claimed ${idsB.size})") {
intersection shouldHaveSize 0
}

// Together they cover all 20 entries
val union = (idsA + idsB).toSet()
union shouldHaveSize 20
union shouldBe allIds.toSet()
withClue("Union of A (${idsA.size}) and B (${idsB.size}) should cover all ${allIds.size} entries") {
union shouldHaveSize 20
union shouldBe allIds.toSet()
}

// Let Thread A commit and finish
canCommit.countDown()
Expand Down Expand Up @@ -146,11 +151,15 @@ fun FunSpec.concurrentClaimTests(

// Verify no amplification
recorder.assertNoAmplification()
recorder.deliveryCount() shouldBe 50
withClue("Expected exactly 50 unique deliveries from 5 concurrent processors, got ${recorder.deliveryCount()}") {
recorder.deliveryCount() shouldBe 50
}

// Verify DB state
val counts = transaction { store.countByStatuses() }
counts shouldContain (OutboxStatus.DELIVERED to 50L)
counts shouldContain (OutboxStatus.PENDING to 0L)
withClue("DB state after concurrent processing: $counts") {
counts shouldContain (OutboxStatus.DELIVERED to 50L)
counts shouldContain (OutboxStatus.PENDING to 0L)
}
}
}