Clean up stopped FSM transition listeners#3096
Conversation
He-Pin
left a comment
There was a problem hiding this comment.
Review: LGTM with minor suggestions
The approach is clean and correct for the common case. Replacing the old listeners.add() with DeathWatch-based cleanup properly addresses the TODO comments and fixes the listener leak on actor stop. The watchedListeners tracking to distinguish subscription-path watches from user-path watches is a good design choice.
Two improvement suggestions posted as inline comments — neither blocks merge.
Verified:
addListener/removeListener/ListenerTerminatedhandler maintainlistenersandwatchedListenersconsistentlyremoveListenercorrectly avoidsunwatchfor actors watched by user logic (not inwatchedListeners)isWatchingguard preventsIllegalStateExceptionfrom mixingwatchandwatchWith- DeathWatch automatically removes from
watchingon termination, soListenerTerminatedhandler doesn't needunwatch - Tests cover both Classic FSM and PersistentFSM cleanup paths
| */ | ||
| private var transitionEvent: List[TransitionHandler] = Nil | ||
| private val watchedListeners: mutable.Set[ActorRef] = mutable.Set.empty | ||
| private def handleTransition(prev: S, next: S): Unit = { |
There was a problem hiding this comment.
🟡 Suggestion: listener leak when FSM already watches the listener actor
When isWatching(actorRef) returns true (the FSM's business logic already called context.watch() on this actor), watchWith is skipped and the actor is not added to watchedListeners. If that actor later terminates, the FSM receives a plain Terminated instead of ListenerTerminated, and the listener is never removed from listeners.
Adding a case Terminated(actorRef) => listeners.remove(actorRef) handler to the receive block would be a breaking change (it would intercept Terminated before it reaches user state functions via processMsg). So this edge case may be acceptable as-is.
Consider adding a log warning when isWatching is true at subscription time, so users know the listener won't be auto-cleaned:
private def addListener(actorRef: ActorRef): Unit = {
if (listeners.add(actorRef)) {
if (!context.asInstanceOf[ActorCell].isWatching(actorRef)) {
context.watchWith(actorRef, ListenerTerminated(actorRef))
watchedListeners += actorRef
} else {
log.warning(
"Listener [{}] is already watched by this FSM; it will not be " +
"automatically removed on termination. Use UnsubscribeTransitionCallBack explicitly.",
actorRef)
}
}
}Not blocking — the common case is handled correctly.
| @@ -482,11 +481,25 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging | |||
| * transition handling | |||
There was a problem hiding this comment.
🟡 Suggestion: code duplication with FSM.scala
The watchedListeners field, addListener, and removeListener are duplicated nearly identically between FSM.scala and PersistentFSMBase.scala. This is understandable given they share the Listeners trait but not a common base beyond that.
Not blocking for this PR — but consider extracting this into a shared mixin (e.g. WatchedListeners) in a follow-up, to avoid divergent fixes if this logic needs to change later.
Also, ListenerTerminated visibility differs: private in FSM companion vs private[persistence] in PersistentFSM companion. This is fine since each module only accesses its own companion's case class, but worth aligning if the shared mixin approach is taken.
abbb62e to
b141435
Compare
| * transition handling | ||
| */ | ||
| private var transitionEvent: List[TransitionHandler] = Nil | ||
| private val watchedListeners: mutable.Set[ActorRef] = mutable.Set.empty |
There was a problem hiding this comment.
Why not a var with an immutable Set? Like the transitionEvent list above.
There was a problem hiding this comment.
Save ygc, I think? But I agree with you.
|
|
||
| private def addListener(actorRef: ActorRef): Unit = { | ||
| if (listeners.add(actorRef)) { | ||
| if (!context.asInstanceOf[ActorCell].isWatching(actorRef)) { |
b141435 to
f33ba0d
Compare
|
|
||
| private def addListener(actorRef: ActorRef): Unit = { | ||
| if (listeners.add(actorRef)) { | ||
| if (!context.asInstanceOf[ActorCell].isWatching(actorRef)) { |
f33ba0d to
8aa2b1d
Compare
pjfanning
left a comment
There was a problem hiding this comment.
mima binary compatibility issue
needs an excludes file to include the problem exclusions
[error] pekko-actor: Failed binary compatibility check against org.apache.pekko:pekko-actor_2.13:1.0.0! Found 2 potential problems (filtered 351)
[error] * abstract synthetic method org$apache$pekko$actor$FSM$$watchedListeners()scala.collection.immutable.Set in interface org.apache.pekko.actor.FSM is present only in current version
[error] filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.FSM.org$apache$pekko$actor$FSM$$watchedListeners")
[error] * abstract synthetic method org$apache$pekko$actor$FSM$$watchedListeners_=(scala.collection.immutable.Set)Unit in interface org.apache.pekko.actor.FSM is present only in current version
[error] filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.FSM.org$apache$pekko$actor$FSM$$watchedListeners_=")
[error] pekko-persistence: Failed binary compatibility check against org.apache.pekko:pekko-persistence_2.13:1.0.0! Found 2 potential problems (filtered 134)
[error] * abstract synthetic method org$apache$pekko$persistence$fsm$PersistentFSMBase$$watchedListeners()scala.collection.immutable.Set in interface org.apache.pekko.persistence.fsm.PersistentFSMBase is present only in current version
[error] filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.fsm.PersistentFSMBase.org$apache$pekko$persistence$fsm$PersistentFSMBase$$watchedListeners")
[error] * abstract synthetic method org$apache$pekko$persistence$fsm$PersistentFSMBase$$watchedListeners_=(scala.collection.immutable.Set)Unit in interface org.apache.pekko.persistence.fsm.PersistentFSMBase is present only in current version
[error] filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.fsm.PersistentFSMBase.org$apache$pekko$persistence$fsm$PersistentFSMBase$$watchedListeners_=")
[error] java.lang.RuntimeException: Failed binary compatibility check against org.apache.pekko:pekko-persistence_2.13:1.0.0! Found 2 potential problems (filtered 134)
Summary
Classic FSM and PersistentFSM keep transition listeners in a listener set until they explicitly unsubscribe. This changes the subscription path to DeathWatch listeners with an internal cleanup message, so stopped listener actors are removed automatically.
The explicit unsubscribe/deafen paths still remove listeners, and they now also unwatch only DeathWatch registrations created by the listener subscription path. If an FSM was already watching the same actor for its own logic, this change does not replace that existing watch.
Tests
actor-tests/testOnly org.apache.pekko.actor.FSMTransitionSpecpersistence/testOnly org.apache.pekko.persistence.fsm.InmemPersistentFSMSpecdocs/paradox