Skip to content

Add async job event bus publisher#12971

Open
sureshanaparti wants to merge 4 commits intoapache:4.22from
shapeblue:async-job-event-bus-publisher
Open

Add async job event bus publisher#12971
sureshanaparti wants to merge 4 commits intoapache:4.22from
shapeblue:async-job-event-bus-publisher

Conversation

@sureshanaparti
Copy link
Copy Markdown
Contributor

Description

This PR adds async job event bus publisher, a dedicated single-threaded executor for publishing job events in AsyncJobManagerImpl to reduce API thread contention.

publishOnEventBus() was calling _messageBus.publish() synchronously on the request thread, which blocks on MessageBusBase$Gate (an exclusive mutex). JFR analysis showed this causing up to 107ms waits on Jetty request threads, contributing to 502 errors from the upstream load balancer.

This moves the event bus publishing to a dedicated single-threaded executor so API request threads are no longer blocked by Gate contention. Event ordering is preserved by the single-threaded executor.

Types of changes

  • Breaking change (fix or feature that would cause existing functionality to change)
  • New feature (non-breaking change which adds functionality)
  • Bug fix (non-breaking change which fixes an issue)
  • Enhancement (improves an existing feature and functionality)
  • Cleanup (Code refactoring and cleanup, that may add test cases)
  • Build/CI
  • Test (unit or integration test code)

Feature/Enhancement Scale or Bug Severity

Feature/Enhancement Scale

  • Major
  • Minor

Bug Severity

  • BLOCKER
  • Critical
  • Major
  • Minor
  • Trivial

Screenshots (if appropriate):

How Has This Been Tested?

How did you try to break this feature and the system with this change?

sliceofapplepie and others added 3 commits April 7, 2026 16:08
…d contention

publishOnEventBus() was calling _messageBus.publish() synchronously on the
request thread, which blocks on MessageBusBase$Gate (an exclusive mutex).
JFR analysis showed this causing up to 107ms waits on Jetty request threads,
contributing to 502 errors from the upstream load balancer.

Move event bus publishing to a dedicated single-threaded executor so API
request threads are no longer blocked by Gate contention. Event ordering
is preserved by the single-threaded executor.
…ion safety

The sole subscriber (ApiServer.handleAsyncJobPublishEvent) performs DAO reads
(getUserIncludingRemoved, getAccount, findById) inside its callback.
Without ManagedContextRunnable, the EventBus thread would not have proper
TransactionLegacy lifecycle management, risking DB connection leaks.
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 6.66667% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 17.60%. Comparing base (4708121) to head (97b15fc).

Files with missing lines Patch % Lines
...stack/framework/jobs/impl/AsyncJobManagerImpl.java 6.66% 14 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               4.22   #12971      +/-   ##
============================================
- Coverage     17.60%   17.60%   -0.01%     
+ Complexity    15677    15675       -2     
============================================
  Files          5918     5918              
  Lines        531681   531695      +14     
  Branches      65005    65006       +1     
============================================
- Hits          93623    93616       -7     
- Misses       427498   427518      +20     
- Partials      10560    10561       +1     
Flag Coverage Δ
uitests 3.70% <ø> (ø)
unittests 18.67% <6.66%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR offloads async job event publishing from API request threads to a dedicated single-threaded executor in AsyncJobManagerImpl, aiming to reduce Jetty request thread blocking caused by synchronous _messageBus.publish() contention while preserving event ordering.

Changes:

  • Add a dedicated single-thread executor (AsyncJobMgr-EventBus) to publish async job events.
  • Switch publishOnEventBus() to submit message bus publishes asynchronously and handle shutdown rejection.
  • Ensure the new executor is shut down when AsyncJobManagerImpl stops.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 186 to 188
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private final ExecutorService _eventBusPublisher = Executors.newSingleThreadExecutor(new NamedThreadFactory("AsyncJobMgr-EventBus"));
private ExecutorService _apiJobExecutor;
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executors.newSingleThreadExecutor(...) uses an unbounded LinkedBlockingQueue. If _messageBus.publish() blocks (the original issue) for extended periods, this queue can grow without bound and increase memory pressure or OOM under sustained load. Consider using a bounded ThreadPoolExecutor (still single-threaded) with an explicit queue size and a rejection policy (e.g., log+drop or CallerRuns as a backpressure mechanism) that matches desired guarantees for job event delivery.

Copilot uses AI. Check for mistakes.
Comment on lines 1379 to 1384
@Override
public boolean stop() {
_heartbeatScheduler.shutdown();
_eventBusPublisher.shutdown();
_apiJobExecutor.shutdown();
_workerJobExecutor.shutdown();
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop() only calls _eventBusPublisher.shutdown() and immediately returns. Because NamedThreadFactory creates non-daemon threads, a slow/stuck publish can keep the JVM alive longer than expected during shutdown, and queued events may continue to run while other components are already stopping. Consider awaiting termination with a timeout and then shutdownNow() (with logging) to ensure predictable shutdown behavior.

Copilot uses AI. Check for mistakes.
Comment on lines 1401 to +1409
private void publishOnEventBus(AsyncJob job, String jobEvent) {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
try {
_eventBusPublisher.submit(new ManagedContextRunnable() {
@Override
protected void runInContext() {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
}
});
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publishOnEventBus now captures and publishes the live AsyncJob object asynchronously. Callers frequently mutate the same AsyncJobVO instance after calling publishOnEventBus (e.g., updateAsyncJobStatus mutates job inside the following transaction), so subscribers may observe a job state that no longer corresponds to the jobEvent being emitted (notably the "submit" event can be delivered with a job already in-progress or completed). To preserve event semantics, publish an immutable snapshot (e.g., jobId + jobEvent + selected fields copied at enqueue time, or re-fetch a fresh immutable view inside the publisher thread).

Copilot uses AI. Check for mistakes.
Comment on lines +1406 to +1407
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions thrown by _messageBus.publish(...) inside the submitted task will be captured by the executor and effectively ignored because the returned Future is not observed. This can silently drop job events while the API call continues normally. Consider wrapping the publish call in a try/catch (at least Exception, ideally Throwable) within runInContext() and logging failures (optionally including jobId/jobEvent) so operational issues are visible.

Suggested change
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
try {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
} catch (Throwable t) {
logger.warn("Failed to publish async job event on message bus. jobId={}, jobEvent={}",
job != null ? job.getId() : null, jobEvent, t);
}

Copilot uses AI. Check for mistakes.
@sureshanaparti
Copy link
Copy Markdown
Contributor Author

@blueorangutan package

@blueorangutan
Copy link
Copy Markdown

@sureshanaparti a [SL] Jenkins job has been kicked to build packages. It will be bundled with KVM, XenServer and VMware SystemVM templates. I'll keep you posted as I make progress.

@blueorangutan
Copy link
Copy Markdown

Packaging result [SF]: ✔️ el8 ✔️ el9 ✔️ el10 ✔️ debian ✔️ suse15. SL-JID 17386

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants