Add async job event bus publisher#12971
Conversation
…d contention publishOnEventBus() was calling _messageBus.publish() synchronously on the request thread, which blocks on MessageBusBase$Gate (an exclusive mutex). JFR analysis showed this causing up to 107ms waits on Jetty request threads, contributing to 502 errors from the upstream load balancer. Move event bus publishing to a dedicated single-threaded executor so API request threads are no longer blocked by Gate contention. Event ordering is preserved by the single-threaded executor.
…ion safety The sole subscriber (ApiServer.handleAsyncJobPublishEvent) performs DAO reads (getUserIncludingRemoved, getAccount, findById) inside its callback. Without ManagedContextRunnable, the EventBus thread would not have proper TransactionLegacy lifecycle management, risking DB connection leaks.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## 4.22 #12971 +/- ##
============================================
- Coverage 17.60% 17.60% -0.01%
+ Complexity 15677 15675 -2
============================================
Files 5918 5918
Lines 531681 531695 +14
Branches 65005 65006 +1
============================================
- Hits 93623 93616 -7
- Misses 427498 427518 +20
- Partials 10560 10561 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR offloads async job event publishing from API request threads to a dedicated single-threaded executor in AsyncJobManagerImpl, aiming to reduce Jetty request thread blocking caused by synchronous _messageBus.publish() contention while preserving event ordering.
Changes:
- Add a dedicated single-thread executor (
AsyncJobMgr-EventBus) to publish async job events. - Switch
publishOnEventBus()to submit message bus publishes asynchronously and handle shutdown rejection. - Ensure the new executor is shut down when
AsyncJobManagerImplstops.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); | ||
| private final ExecutorService _eventBusPublisher = Executors.newSingleThreadExecutor(new NamedThreadFactory("AsyncJobMgr-EventBus")); | ||
| private ExecutorService _apiJobExecutor; |
There was a problem hiding this comment.
Executors.newSingleThreadExecutor(...) uses an unbounded LinkedBlockingQueue. If _messageBus.publish() blocks (the original issue) for extended periods, this queue can grow without bound and increase memory pressure or OOM under sustained load. Consider using a bounded ThreadPoolExecutor (still single-threaded) with an explicit queue size and a rejection policy (e.g., log+drop or CallerRuns as a backpressure mechanism) that matches desired guarantees for job event delivery.
| @Override | ||
| public boolean stop() { | ||
| _heartbeatScheduler.shutdown(); | ||
| _eventBusPublisher.shutdown(); | ||
| _apiJobExecutor.shutdown(); | ||
| _workerJobExecutor.shutdown(); |
There was a problem hiding this comment.
stop() only calls _eventBusPublisher.shutdown() and immediately returns. Because NamedThreadFactory creates non-daemon threads, a slow/stuck publish can keep the JVM alive longer than expected during shutdown, and queued events may continue to run while other components are already stopping. Consider awaiting termination with a timeout and then shutdownNow() (with logging) to ensure predictable shutdown behavior.
| private void publishOnEventBus(AsyncJob job, String jobEvent) { | ||
| _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, | ||
| new Pair<AsyncJob, String>(job, jobEvent)); | ||
| try { | ||
| _eventBusPublisher.submit(new ManagedContextRunnable() { | ||
| @Override | ||
| protected void runInContext() { | ||
| _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, | ||
| new Pair<AsyncJob, String>(job, jobEvent)); | ||
| } | ||
| }); |
There was a problem hiding this comment.
publishOnEventBus now captures and publishes the live AsyncJob object asynchronously. Callers frequently mutate the same AsyncJobVO instance after calling publishOnEventBus (e.g., updateAsyncJobStatus mutates job inside the following transaction), so subscribers may observe a job state that no longer corresponds to the jobEvent being emitted (notably the "submit" event can be delivered with a job already in-progress or completed). To preserve event semantics, publish an immutable snapshot (e.g., jobId + jobEvent + selected fields copied at enqueue time, or re-fetch a fresh immutable view inside the publisher thread).
| _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, | ||
| new Pair<AsyncJob, String>(job, jobEvent)); |
There was a problem hiding this comment.
Exceptions thrown by _messageBus.publish(...) inside the submitted task will be captured by the executor and effectively ignored because the returned Future is not observed. This can silently drop job events while the API call continues normally. Consider wrapping the publish call in a try/catch (at least Exception, ideally Throwable) within runInContext() and logging failures (optionally including jobId/jobEvent) so operational issues are visible.
| _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, | |
| new Pair<AsyncJob, String>(job, jobEvent)); | |
| try { | |
| _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, | |
| new Pair<AsyncJob, String>(job, jobEvent)); | |
| } catch (Throwable t) { | |
| logger.warn("Failed to publish async job event on message bus. jobId={}, jobEvent={}", | |
| job != null ? job.getId() : null, jobEvent, t); | |
| } |
|
@blueorangutan package |
|
@sureshanaparti a [SL] Jenkins job has been kicked to build packages. It will be bundled with KVM, XenServer and VMware SystemVM templates. I'll keep you posted as I make progress. |
|
Packaging result [SF]: ✔️ el8 ✔️ el9 ✔️ el10 ✔️ debian ✔️ suse15. SL-JID 17386 |
Description
This PR adds async job event bus publisher, a dedicated single-threaded executor for publishing job events in AsyncJobManagerImpl to reduce API thread contention.
publishOnEventBus() was calling _messageBus.publish() synchronously on the request thread, which blocks on MessageBusBase$Gate (an exclusive mutex). JFR analysis showed this causing up to 107ms waits on Jetty request threads, contributing to 502 errors from the upstream load balancer.
This moves the event bus publishing to a dedicated single-threaded executor so API request threads are no longer blocked by Gate contention. Event ordering is preserved by the single-threaded executor.
Types of changes
Feature/Enhancement Scale or Bug Severity
Feature/Enhancement Scale
Bug Severity
Screenshots (if appropriate):
How Has This Been Tested?
How did you try to break this feature and the system with this change?