Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private volatile long _executionRunNumber = 1;

private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private final ExecutorService _eventBusPublisher = Executors.newSingleThreadExecutor(new NamedThreadFactory("AsyncJobMgr-EventBus"));
private ExecutorService _apiJobExecutor;
Comment on lines 186 to 188
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executors.newSingleThreadExecutor(...) uses an unbounded LinkedBlockingQueue. If _messageBus.publish() blocks (the original issue) for extended periods, this queue can grow without bound and increase memory pressure or OOM under sustained load. Consider using a bounded ThreadPoolExecutor (still single-threaded) with an explicit queue size and a rejection policy (e.g., log+drop or CallerRuns as a backpressure mechanism) that matches desired guarantees for job event delivery.

Copilot uses AI. Check for mistakes.
private ExecutorService _workerJobExecutor;

Expand Down Expand Up @@ -1378,6 +1379,7 @@ public boolean start() {
@Override
public boolean stop() {
_heartbeatScheduler.shutdown();
_eventBusPublisher.shutdown();
_apiJobExecutor.shutdown();
_workerJobExecutor.shutdown();
Comment on lines 1379 to 1384
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop() only calls _eventBusPublisher.shutdown() and immediately returns. Because NamedThreadFactory creates non-daemon threads, a slow/stuck publish can keep the JVM alive longer than expected during shutdown, and queued events may continue to run while other components are already stopping. Consider awaiting termination with a timeout and then shutdownNow() (with logging) to ensure predictable shutdown behavior.

Copilot uses AI. Check for mistakes.
return true;
Expand All @@ -1397,8 +1399,22 @@ protected AsyncJobManagerImpl() {
}

private void publishOnEventBus(AsyncJob job, String jobEvent) {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
try {
_eventBusPublisher.submit(new ManagedContextRunnable() {
@Override
protected void runInContext() {
try {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<>(job, jobEvent));
} catch (Throwable t) {
logger.warn("Failed to publish async job event on message bus. jobId={}, jobEvent={}",
job != null ? job.getId() : null, jobEvent, t);
}
}
});
Comment on lines 1401 to +1414
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publishOnEventBus now captures and publishes the live AsyncJob object asynchronously. Callers frequently mutate the same AsyncJobVO instance after calling publishOnEventBus (e.g., updateAsyncJobStatus mutates job inside the following transaction), so subscribers may observe a job state that no longer corresponds to the jobEvent being emitted (notably the "submit" event can be delivered with a job already in-progress or completed). To preserve event semantics, publish an immutable snapshot (e.g., jobId + jobEvent + selected fields copied at enqueue time, or re-fetch a fresh immutable view inside the publisher thread).

Copilot uses AI. Check for mistakes.
} catch (RejectedExecutionException e) {
logger.warn("Failed to publish async job event, event bus publisher is shut down", e);
}
}

@Override
Expand Down
Loading