My use case for Dask is primarily for orchestrating the execution of long running functions. By long running, I mean minutes, and sometimes hours. Tasks can be submitted to a cluster in waves, eventually saturating all workers, and causing some shorter running tasks to get queued behind very long-running tasks. Worker stealing is moot given all workers are saturated.
I would like to prevent eager assignment of ready tasks to worker queues, allowing tasks to build up on the scheduler. Currently, the minimum worker queue achievable is 1 (i.e, via a worker-saturation setting <= 1.0)). This appears to be controlled via distributed.scheduler._task_slots_available():
def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int:
"""Number of tasks that can be sent to this worker without oversaturating it"""
assert not math.isinf(saturation_factor)
return max(math.ceil(saturation_factor * ws.nthreads), 1) - (
len(ws.processing) - len(ws.long_running)
)
All I ask is to expose a setting to allow a floor of 0 here. Thanks!
My use case for Dask is primarily for orchestrating the execution of long running functions. By long running, I mean minutes, and sometimes hours. Tasks can be submitted to a cluster in waves, eventually saturating all workers, and causing some shorter running tasks to get queued behind very long-running tasks. Worker stealing is moot given all workers are saturated.
I would like to prevent eager assignment of ready tasks to worker queues, allowing tasks to build up on the scheduler. Currently, the minimum worker queue achievable is
1(i.e, via aworker-saturationsetting<= 1.0)). This appears to be controlled viadistributed.scheduler._task_slots_available():All I ask is to expose a setting to allow a floor of
0here. Thanks!