diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 7672b9dc6f97..05cd763d2f2a 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -184,6 +184,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private volatile long _executionRunNumber = 1; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); + private final ExecutorService _eventBusPublisher = Executors.newSingleThreadExecutor(new NamedThreadFactory("AsyncJobMgr-EventBus")); private ExecutorService _apiJobExecutor; private ExecutorService _workerJobExecutor; @@ -1378,6 +1379,7 @@ public boolean start() { @Override public boolean stop() { _heartbeatScheduler.shutdown(); + _eventBusPublisher.shutdown(); _apiJobExecutor.shutdown(); _workerJobExecutor.shutdown(); return true; @@ -1397,8 +1399,22 @@ protected AsyncJobManagerImpl() { } private void publishOnEventBus(AsyncJob job, String jobEvent) { - _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, - new Pair(job, jobEvent)); + try { + _eventBusPublisher.submit(new ManagedContextRunnable() { + @Override + protected void runInContext() { + try { + _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, + new Pair<>(job, jobEvent)); + } catch (Throwable t) { + logger.warn("Failed to publish async job event on message bus. jobId={}, jobEvent={}", + job != null ? job.getId() : null, jobEvent, t); + } + } + }); + } catch (RejectedExecutionException e) { + logger.warn("Failed to publish async job event, event bus publisher is shut down", e); + } } @Override