diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb new file mode 100644 index 00000000000..e0af549b8c0 --- /dev/null +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -0,0 +1,91 @@ +module VCAP::CloudController + module Jobs + module Runtime + class DelayedJobsRecover < VCAP::CloudController::Jobs::CCJob + RECOVERABLE_OPERATIONS = %w[ + service_instance.create + ].freeze + + def perform + logger.info('Recover halted delayed jobs') + recover + end + + def max_attempts + 1 + end + + private + + def recover + # find delayed jobs where failed_at is set (permanently failed) + # and still within the max polling duration (not expired) + cutoff_time = Time.now - default_maximum_duration_seconds + dead_delayed_jobs = Delayed::Job. + exclude(failed_at: nil). + where { created_at > cutoff_time }. + order(:created_at). + limit(batch_size) + + dead_delayed_jobs.each do |delayed| + # pollable job state can be POLLING or FAILED depending on whether the failure + # hook managed to persist before the db connection was lost + pollable = PollableJobModel.where(delayed_job_guid: delayed.guid). + where(state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + first + next unless pollable + next unless RECOVERABLE_OPERATIONS.include?(pollable.operation) + + # last_operation.state must be 'in progress'. This confirms the broker is still + # working on the operation and CC is the one that gave up, not the broker + entity = find_entity(pollable) + next unless entity + next unless entity.last_operation&.state == 'in progress' + + reenqueue(pollable, delayed) + end + end + + def find_entity(pollable) + # TODO: resource_type field can be used + case pollable.operation + when 'service_instance.create' + ManagedServiceInstance.first(guid: pollable.resource_guid) + end + end + + def reenqueue(pollable, delayed) + # re-verify atomically that the pollable job still points to this dead delayed_job. + # if another process already re-enqueued a new job, pollable.delayed_job_guid was + # updated to the new delayed_job's guid, so where clause returns nil and we skip safely. + PollableJobModel.db.transaction do + pjob = PollableJobModel.where(guid: pollable.guid, + delayed_job_guid: delayed.guid, + state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + for_update.first + return unless pjob + + # bring the pollable job into the clean polling state + pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) + + # unwrap the serialized handler and re-enqueue via the reoccurring job + inner_job = Jobs::Enqueuer.unwrap_job(delayed.payload_object) + inner_job.send(:enqueue_next_job, pjob) + end + end + + def default_maximum_duration_seconds + Config.config.get(:broker_client_max_async_poll_duration_minutes).minutes + end + + def logger + @logger ||= Steno.logger('cc.background') + end + + def batch_size + 10 + end + end + end + end +end diff --git a/config/cloud_controller.yml b/config/cloud_controller.yml index bdb1f9108f5..35ddd1d1655 100644 --- a/config/cloud_controller.yml +++ b/config/cloud_controller.yml @@ -375,7 +375,8 @@ diego_sync: pending_droplets: frequency_in_seconds: 300 expiration_in_seconds: 42 - +delayed_jobs_recover: + frequency_in_seconds: 600 pending_builds: expiration_in_seconds: 42 frequency_in_seconds: 300 diff --git a/lib/cloud_controller/clock/scheduler.rb b/lib/cloud_controller/clock/scheduler.rb index 388b5db11d5..914dc67e350 100644 --- a/lib/cloud_controller/clock/scheduler.rb +++ b/lib/cloud_controller/clock/scheduler.rb @@ -24,7 +24,8 @@ class Scheduler { name: 'pending_droplets', class: Jobs::Runtime::PendingDropletCleanup }, { name: 'pending_builds', class: Jobs::Runtime::PendingBuildCleanup }, { name: 'failed_jobs', class: Jobs::Runtime::FailedJobsCleanup }, - { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup } + { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup }, + { name: 'delayed_jobs_recover', class: Jobs::Runtime::DelayedJobsRecover } ].freeze def initialize(config) diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index a822b57ca31..5c4ade54dc0 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -34,6 +34,9 @@ class ClockSchema < VCAP::Config completed_tasks: { cutoff_age_in_days: Integer }, + delayed_jobs_recover: { + frequency_in_seconds: Integer + }, default_health_check_timeout: Integer, uaa: { diff --git a/lib/cloud_controller/jobs.rb b/lib/cloud_controller/jobs.rb index 9f39f53d152..47d49f4dde9 100644 --- a/lib/cloud_controller/jobs.rb +++ b/lib/cloud_controller/jobs.rb @@ -25,6 +25,7 @@ require 'jobs/runtime/expired_blob_cleanup' require 'jobs/runtime/expired_orphaned_blob_cleanup' require 'jobs/runtime/expired_resource_cleanup' +require 'jobs/runtime/delayed_jobs_recover' require 'jobs/runtime/failed_jobs_cleanup' require 'jobs/runtime/service_operations_initial_cleanup' require 'jobs/runtime/legacy_jobs' diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index 86c33fe9208..74946b32a25 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -49,6 +49,7 @@ namespace :jobs do 'audit_events', 'failed_jobs', 'service_operations_initial_cleanup', + 'delayed_jobs_recover', 'service_usage_events', 'completed_tasks', 'expired_blob_cleanup', diff --git a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb index 20d0e49b5bd..beb7a885091 100644 --- a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb +++ b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb @@ -21,6 +21,7 @@ module VCAP::CloudController failed_jobs: { frequency_in_seconds: 400, cutoff_age_in_days: 4, max_number_of_failed_delayed_jobs: 10 }, pollable_jobs: { cutoff_age_in_days: 2 }, service_operations_initial_cleanup: { frequency_in_seconds: 600 }, + delayed_jobs_recover: { frequency_in_seconds: 600 }, service_usage_events: { cutoff_age_in_days: 5 }, completed_tasks: { cutoff_age_in_days: 6 }, pending_droplets: { frequency_in_seconds: 300, expiration_in_seconds: 600 }, @@ -161,6 +162,12 @@ module VCAP::CloudController expect(block.call).to be_instance_of(Jobs::Runtime::ServiceOperationsInitialCleanup) end + expect(clock).to receive(:schedule_frequent_worker_job) do |args, &block| + expect(args).to eql(name: 'delayed_jobs_recover', interval: 600) + expect(Jobs::Runtime::DelayedJobsRecover).to receive(:new).and_call_original + expect(block.call).to be_instance_of(Jobs::Runtime::DelayedJobsRecover) + end + schedule.start end