From 03c4e59c158243ec325d45e7f5c8290763655ce6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Thu, 9 Apr 2026 17:23:13 +0200 Subject: [PATCH 1/3] feat: add DelayedJobsRecover scheduled job to re-enqueue stuck service operations Introduces a new periodic recovery job that scans permanently failed delayed_jobs and re-enqueues polling for service operations still in progress at the broker. Recovers cases where a transient DB connection error caused the polling job to fail permanently (max_attempts=1) while the broker operation was still running, leaving the service instance stuck in 'in progress' with no active poller. --- app/jobs/runtime/delayed_jobs_recover.rb | 91 +++++++++++++++++++ config/cloud_controller.yml | 3 +- lib/cloud_controller/clock/scheduler.rb | 3 +- .../config_schemas/clock_schema.rb | 3 + lib/cloud_controller/jobs.rb | 1 + lib/tasks/jobs.rake | 1 + 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 app/jobs/runtime/delayed_jobs_recover.rb diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb new file mode 100644 index 00000000000..6caa2a11397 --- /dev/null +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -0,0 +1,91 @@ +module VCAP::CloudController + module Jobs + module Runtime + class DelayedJobsRecover < VCAP::CloudController::Jobs::CCJob + RECOVERABLE_OPERATIONS = %w[ + service_instance.create + ].freeze + + def perform + logger.info('Recover halted delayed jobs') + recover + end + + def max_attempts + 1 + end + + private + + def recover + # find delayed jobs where failed_at is set (permanently failed) + # and still within the max polling duration (not expired) + cutoff_time = Time.now - default_maximum_duration_seconds + dead_delayed_jobs = Delayed::Job. + exclude(failed_at: nil). + where { created_at > cutoff_time }. + order(:created_at). + limit(batch_size) + + dead_delayed_jobs.each do |delayed| + # pollable job state can be POLLING or FAILED depending on whether the failure + # hook managed to persist before the db connection was lost + pollable = PollableJobModel.where(delayed_job_guid: delayed.guid). + where(state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + first + next unless pollable + next unless RECOVERABLE_OPERATIONS.include?(pollable.operation) + + # last_operation.state must be 'in progress'. This confirms the broker is still + # working on the operation and CC is the one that gave up, not the broker + entity = find_entity(pollable) + next unless entity + next unless entity.last_operation&.state == 'in progress' + + reenqueue(pollable, delayed) + end + end + + def find_entity(pollable) + # TODO: resource_type field can be used + case pollable.operation + when 'service_instance.create' + ManagedServiceInstance.first(guid: pollable.resource_guid) + end + end + + def reenqueue(pollable, delayed) + # re-verify atomically that the pollable job still points to this dead delayed_job. + # if another process already re-enqueued a new job, pollable.delayed_job_guid was + # updated to the new delayed_job's guid, so where clause returns nil and we skip safely. + PollableJobModel.db.transaction do + pjob = PollableJobModel.where(guid: pollable.guid, + delayed_job_guid: delayed.guid, + state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + for_update.first + return unless pjob + + # bring the record into a clean polling state + pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) + + # unwrap the serialized handler and re-enqueue via the reoccurring job + inner_job = Jobs::Enqueuer.unwrap_job(delayed.payload_object) + inner_job.send(:enqueue_next_job, pjob) + end + end + + def default_maximum_duration_seconds + Config.config.get(:broker_client_max_async_poll_duration_minutes).minutes + end + + def logger + @logger ||= Steno.logger('cc.background') + end + + def batch_size + 10 + end + end + end + end +end diff --git a/config/cloud_controller.yml b/config/cloud_controller.yml index bdb1f9108f5..35ddd1d1655 100644 --- a/config/cloud_controller.yml +++ b/config/cloud_controller.yml @@ -375,7 +375,8 @@ diego_sync: pending_droplets: frequency_in_seconds: 300 expiration_in_seconds: 42 - +delayed_jobs_recover: + frequency_in_seconds: 600 pending_builds: expiration_in_seconds: 42 frequency_in_seconds: 300 diff --git a/lib/cloud_controller/clock/scheduler.rb b/lib/cloud_controller/clock/scheduler.rb index 388b5db11d5..914dc67e350 100644 --- a/lib/cloud_controller/clock/scheduler.rb +++ b/lib/cloud_controller/clock/scheduler.rb @@ -24,7 +24,8 @@ class Scheduler { name: 'pending_droplets', class: Jobs::Runtime::PendingDropletCleanup }, { name: 'pending_builds', class: Jobs::Runtime::PendingBuildCleanup }, { name: 'failed_jobs', class: Jobs::Runtime::FailedJobsCleanup }, - { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup } + { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup }, + { name: 'delayed_jobs_recover', class: Jobs::Runtime::DelayedJobsRecover } ].freeze def initialize(config) diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index a822b57ca31..5c4ade54dc0 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -34,6 +34,9 @@ class ClockSchema < VCAP::Config completed_tasks: { cutoff_age_in_days: Integer }, + delayed_jobs_recover: { + frequency_in_seconds: Integer + }, default_health_check_timeout: Integer, uaa: { diff --git a/lib/cloud_controller/jobs.rb b/lib/cloud_controller/jobs.rb index 9f39f53d152..47d49f4dde9 100644 --- a/lib/cloud_controller/jobs.rb +++ b/lib/cloud_controller/jobs.rb @@ -25,6 +25,7 @@ require 'jobs/runtime/expired_blob_cleanup' require 'jobs/runtime/expired_orphaned_blob_cleanup' require 'jobs/runtime/expired_resource_cleanup' +require 'jobs/runtime/delayed_jobs_recover' require 'jobs/runtime/failed_jobs_cleanup' require 'jobs/runtime/service_operations_initial_cleanup' require 'jobs/runtime/legacy_jobs' diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index 86c33fe9208..74946b32a25 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -49,6 +49,7 @@ namespace :jobs do 'audit_events', 'failed_jobs', 'service_operations_initial_cleanup', + 'delayed_jobs_recover', 'service_usage_events', 'completed_tasks', 'expired_blob_cleanup', From 1321d2601a1751474ae8b91d4bceaa97480a53d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Thu, 9 Apr 2026 17:46:00 +0200 Subject: [PATCH 2/3] fix: comment is fixed --- app/jobs/runtime/delayed_jobs_recover.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb index 6caa2a11397..e0af549b8c0 100644 --- a/app/jobs/runtime/delayed_jobs_recover.rb +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -65,7 +65,7 @@ def reenqueue(pollable, delayed) for_update.first return unless pjob - # bring the record into a clean polling state + # bring the pollable job into the clean polling state pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) # unwrap the serialized handler and re-enqueue via the reoccurring job From a8ffca4b05e13744bb683962e137b58fc0b7fa57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Thu, 9 Apr 2026 18:07:24 +0200 Subject: [PATCH 3/3] feat: new sheduling jobs test is added --- spec/unit/lib/cloud_controller/clock/scheduler_spec.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb index 20d0e49b5bd..beb7a885091 100644 --- a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb +++ b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb @@ -21,6 +21,7 @@ module VCAP::CloudController failed_jobs: { frequency_in_seconds: 400, cutoff_age_in_days: 4, max_number_of_failed_delayed_jobs: 10 }, pollable_jobs: { cutoff_age_in_days: 2 }, service_operations_initial_cleanup: { frequency_in_seconds: 600 }, + delayed_jobs_recover: { frequency_in_seconds: 600 }, service_usage_events: { cutoff_age_in_days: 5 }, completed_tasks: { cutoff_age_in_days: 6 }, pending_droplets: { frequency_in_seconds: 300, expiration_in_seconds: 600 }, @@ -161,6 +162,12 @@ module VCAP::CloudController expect(block.call).to be_instance_of(Jobs::Runtime::ServiceOperationsInitialCleanup) end + expect(clock).to receive(:schedule_frequent_worker_job) do |args, &block| + expect(args).to eql(name: 'delayed_jobs_recover', interval: 600) + expect(Jobs::Runtime::DelayedJobsRecover).to receive(:new).and_call_original + expect(block.call).to be_instance_of(Jobs::Runtime::DelayedJobsRecover) + end + schedule.start end