Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion okapi-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@ dependencies {
implementation(libs.slf4jApi)
testImplementation(libs.kotestRunnerJunit5)
testImplementation(libs.kotestAssertionsCore)
testRuntimeOnly(libs.slf4jSimple)
// logback (not slf4j-simple) at test scope so tests can attach a ListAppender to capture
// log events from production code. Logback is the only SLF4J binding on the test classpath
// (avoids the multiple-bindings warning); production code stays free of any logging backend.
testImplementation(libs.logbackClassic)
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ class OutboxPurger @JvmOverloads constructor(
fun isRunning(): Boolean = running.get()

private fun tick() {
// Counters live outside the try so the catch handler can include partial progress in
// the error log. Each batch runs in its own transaction (see do-loop body), so batches
// 0..N-1 are already durably committed when iteration N throws -- without this an
// operator sees only "Outbox purge failed" and has no way to know whether any rows
// were actually purged before the failure.
var totalDeleted = 0
var batches = 0
try {
val cutoff = clock.instant().minus(config.retention)
var totalDeleted = 0
var batches = 0
do {
val deleted = transactionRunner?.runInTransaction {
outboxStore.removeDeliveredBefore(cutoff, config.batchSize)
Expand All @@ -70,7 +75,13 @@ class OutboxPurger @JvmOverloads constructor(
logger.debug("Purged {} delivered entries in {} batches", totalDeleted, batches)
}
} catch (e: Exception) {
logger.error("Outbox purge failed, will retry at next scheduled interval", e)
logger.error(
"Outbox purge failed after {} batches ({} entries purged this tick), " +
"will retry at next scheduled interval",
batches,
totalDeleted,
e,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.softwaremill.okapi.core

import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Duration.ofDays
import java.time.Duration.ofMillis
Expand Down Expand Up @@ -115,6 +120,49 @@ class OutboxPurgerTest : FunSpec({
callCount.get() shouldBe 2
}

test("error log preserves partial batch progress on mid-loop failure") {
// Each batch runs in its own transaction, so batches 0..N-1 are durably committed when
// iteration N throws. The error log must surface that fact -- otherwise an operator
// paged on "Outbox purge failed" cannot tell whether 0 or 9000 entries were purged
// before the failure without inspecting the database directly.
//
// Asserts on the logger's typed arguments (Int batches, Int totalDeleted), not on the
// formatted message text -- decoupling the regression check from log wording changes.
val callCount = AtomicInteger(0)
val errorLogged = CountDownLatch(1)
val store = stubStore(onRemove = { _, _ ->
val count = callCount.incrementAndGet()
if (count == 3) throw RuntimeException("simulated db failure")
100
})

val purgerLogger = LoggerFactory.getLogger(OutboxPurger::class.java) as Logger
val appender = object : ListAppender<ILoggingEvent>() {
override fun append(event: ILoggingEvent) {
super.append(event)
if (event.level == Level.ERROR) errorLogged.countDown()
}
}.apply { start() }
purgerLogger.addAppender(appender)

try {
val purger = OutboxPurger(
outboxStore = store,
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
purger.start()
errorLogged.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

val errorEvent = appender.list.single { it.level == Level.ERROR }
errorEvent.argumentArray.toList() shouldBe listOf(2, 200)
errorEvent.throwableProxy.message shouldBe "simulated db failure"
} finally {
purgerLogger.detachAppender(appender)
}
}

test("double start is ignored") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(1)
Expand Down